diff --git a/airbyte/_util/api_imports.py b/airbyte/_util/api_imports.py index 19935b6f..223262f2 100644 --- a/airbyte/_util/api_imports.py +++ b/airbyte/_util/api_imports.py @@ -23,6 +23,7 @@ # These classes are used internally to cache API responses. from airbyte_api.models import ( ConnectionResponse, + SourceResponse, DestinationResponse, JobResponse, ) @@ -32,11 +33,16 @@ # This class is used to represent the status of a job. It may be used in # type hints for public functions that return a job status. from airbyte_api.models import JobStatusEnum # Alias not needed +from airbyte_api.models import ActorTypeEnum as ConnectorTypeEnum __all__: list[str] = [ + # Internal-Use Classes "ConnectionResponse", "DestinationResponse", "JobResponse", + "SourceResponse", + # Public-Use Classes + "ConnectorTypeEnum", "JobStatusEnum", ] diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py deleted file mode 100644 index 6cc5411a..00000000 --- a/airbyte/_util/api_util.py +++ /dev/null @@ -1,547 +0,0 @@ -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -"""These internal functions are used to interact with the Airbyte API (module named `airbyte`). - -In order to insulate users from breaking changes and to avoid general confusion around naming -and design inconsistencies, we do not expose these functions or other Airbyte API classes within -PyAirbyte. Classes and functions from the Airbyte API external library should always be wrapped in -PyAirbyte classes - unless there's a very compelling reason to surface these models intentionally. - -Similarly, modules outside of this file should try to avoid interfacing with `airbyte_api` library -directly. This will ensure a single source of truth when mapping between the `airbyte` and -`airbyte_api` libraries. -""" - -from __future__ import annotations - -import json -from typing import Any - -import airbyte_api -from airbyte_api import api, models - -from airbyte.exceptions import ( - AirbyteConnectionSyncError, - AirbyteError, - AirbyteMissingResourceError, - AirbyteMultipleResourcesError, -) - - -JOB_WAIT_INTERVAL_SECS = 2.0 -JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour -CLOUD_API_ROOT = "https://api.airbyte.com/v1" - -# Helper functions - - -def status_ok(status_code: int) -> bool: - """Check if a status code is OK.""" - return status_code >= 200 and status_code < 300 # noqa: PLR2004 # allow inline magic numbers - - -def get_airbyte_server_instance( - *, - api_key: str, - api_root: str, -) -> airbyte_api.Airbyte: - """Get an Airbyte instance.""" - return airbyte_api.AirbyteAPI( - security=models.Security( - bearer_auth=api_key, - ), - server_url=api_root, - ) - - -# Get workspace - - -def get_workspace( - workspace_id: str, - *, - api_root: str, - api_key: str, -) -> models.WorkspaceResponse: - """Get a connection.""" - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response = airbyte_instance.workspaces.get_workspace( - api.GetWorkspaceRequest( - workspace_id=workspace_id, - ), - ) - if status_ok(response.status_code) and response.workspace_response: - return response.workspace_response - - raise AirbyteMissingResourceError( - resource_type="workspace", - context={ - "workspace_id": workspace_id, - "response": response, - }, - ) - - -# List, get, and run connections - - -def list_connections( - workspace_id: str, - *, - api_root: str, - api_key: str, -) -> list[api.ConnectionResponse]: - """Get a connection.""" - _ = workspace_id # Not used (yet) - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response = airbyte_instance.connections.list_connections( - api.ListConnectionsRequest()( - workspace_ids=[workspace_id], - ), - ) - - if status_ok(response.status_code) and response.connections_response: - return response.connections_response.data - - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "response": response, - } - ) - - -def get_connection( - workspace_id: str, - connection_id: str, - *, - api_root: str, - api_key: str, -) -> api.ConnectionResponse: - """Get a connection.""" - _ = workspace_id # Not used (yet) - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response = airbyte_instance.connections.get_connection( - api.GetConnectionRequest( - connection_id=connection_id, - ), - ) - if status_ok(response.status_code) and response.connection_response: - return response.connection_response - - raise AirbyteMissingResourceError(connection_id, "connection", response.text) - - -def run_connection( - workspace_id: str, - connection_id: str, - *, - api_root: str, - api_key: str, -) -> api.ConnectionResponse: - """Get a connection. - - If block is True, this will block until the connection is finished running. - - If raise_on_failure is True, this will raise an exception if the connection fails. - """ - _ = workspace_id # Not used (yet) - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response = airbyte_instance.jobs.create_job( - models.JobCreateRequest( - connection_id=connection_id, - job_type=models.JobTypeEnum.SYNC, - ), - ) - if status_ok(response.status_code) and response.job_response: - return response.job_response - - raise AirbyteConnectionSyncError( - connection_id=connection_id, - context={ - "workspace_id": workspace_id, - }, - response=response, - ) - - -# Get job info (logs) - - -def get_job_logs( - workspace_id: str, - connection_id: str, - limit: int = 20, - *, - api_root: str, - api_key: str, -) -> list[api.JobResponse]: - """Get a job's logs.""" - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response: api.ListJobsResponse = airbyte_instance.jobs.list_jobs( - api.ListJobsRequest( - workspace_ids=[workspace_id], - connection_id=connection_id, - limit=limit, - ), - ) - if status_ok(response.status_code) and response.jobs_response: - return response.jobs_response.data - - raise AirbyteMissingResourceError( - response=response, - resource_type="job", - context={ - "workspace_id": workspace_id, - "connection_id": connection_id, - }, - ) - - -def get_job_info( - job_id: str, - *, - api_root: str, - api_key: str, -) -> api.JobResponse: - """Get a job.""" - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response = airbyte_instance.jobs.get_job( - api.GetJobRequest( - job_id=job_id, - ), - ) - if status_ok(response.status_code) and response.job_response: - return response.job_response - - raise AirbyteMissingResourceError(job_id, "job", response.text) - - -# Create, get, and delete sources - - -def create_source( - name: str, - *, - workspace_id: str, - config: dict[str, Any], - api_root: str, - api_key: str, -) -> api.SourceResponse: - """Get a connection.""" - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response: api.CreateSourceResponse = airbyte_instance.sources.create_source( - models.SourceCreateRequest( - name=name, - workspace_id=workspace_id, - configuration=config, # TODO: wrap in a proper configuration object - definition_id=None, # Not used alternative to config.sourceType. - secret_id=None, # For OAuth, not yet supported - ), - ) - if status_ok(response.status_code) and response.source_response: - return response.source_response - - raise AirbyteError( - message="Could not create source.", - response=response, - ) - - -def get_source( - source_id: str, - *, - api_root: str, - api_key: str, -) -> api.SourceResponse: - """Get a connection.""" - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response = airbyte_instance.sources.get_source( - api.GetSourceRequest( - source_id=source_id, - ), - ) - if status_ok(response.status_code) and response.connection_response: - return response.connection_response - - raise AirbyteMissingResourceError(source_id, "source", response.text) - - -def delete_source( - source_id: str, - *, - api_root: str, - api_key: str, - workspace_id: str | None = None, -) -> None: - """Delete a source.""" - _ = workspace_id # Not used (yet) - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response = airbyte_instance.sources.delete_source( - api.DeleteSourceRequest( - source_id=source_id, - ), - ) - if not status_ok(response.status_code): - raise AirbyteError( - context={ - "source_id": source_id, - "response": response, - }, - ) - - -# Create, get, and delete destinations - - -def create_destination( - name: str, - *, - workspace_id: str, - config: dict[str, Any], - api_root: str, - api_key: str, -) -> api.DestinationResponse: - """Get a connection.""" - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response: api.CreateDestinationResponse = airbyte_instance.destinations.create_destination( - models.DestinationCreateRequest( - name=name, - workspace_id=workspace_id, - configuration=config, # TODO: wrap in a proper configuration object - ), - ) - if status_ok(response.status_code) and response.destination_response: - return response.destination_response - - raise AirbyteError( - message="Could not create destination.", - response=response, - ) - - -def get_destination( - destination_id: str, - *, - api_root: str, - api_key: str, -) -> api.DestinationResponse: - """Get a connection.""" - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response = airbyte_instance.destinations.get_destination( - api.GetDestinationRequest( - destination_id=destination_id, - ), - ) - if status_ok(response.status_code): - # TODO: This is a temporary workaround to resolve an issue where - # the destination API response is of the wrong type. - raw_response: dict[str, Any] = json.loads(response.raw_response.text) - raw_configuration: dict[str, Any] = raw_response["configuration"] - destination_type = raw_response.get("destinationType") - if destination_type == "snowflake": - response.destination_response.configuration = models.DestinationSnowflake.from_dict( - raw_configuration, - ) - if destination_type == "bigquery": - response.destination_response.configuration = models.DestinationBigquery.from_dict( - raw_configuration, - ) - if destination_type == "postgres": - response.destination_response.configuration = models.DestinationPostgres.from_dict( - raw_configuration, - ) - if destination_type == "duckdb": - response.destination_response.configuration = models.DestinationDuckdb.from_dict( - raw_configuration, - ) - - return response.destination_response - - raise AirbyteMissingResourceError(destination_id, "destination", response.text) - - -def delete_destination( - destination_id: str, - *, - api_root: str, - api_key: str, - workspace_id: str | None = None, -) -> None: - """Delete a destination.""" - _ = workspace_id # Not used (yet) - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response = airbyte_instance.destinations.delete_destination( - api.DeleteDestinationRequest( - destination_id=destination_id, - ), - ) - if not status_ok(response.status_code): - raise AirbyteError( - context={ - "destination_id": destination_id, - "response": response, - }, - ) - - -# Create and delete connections - - -def create_connection( - name: str, - *, - source_id: str, - destination_id: str, - api_root: str, - api_key: str, - workspace_id: str | None = None, - prefix: str, - selected_stream_names: list[str], -) -> models.ConnectionResponse: - _ = workspace_id # Not used (yet) - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - stream_configurations: list[models.StreamConfiguration] = [] - if selected_stream_names: - for stream_name in selected_stream_names: - stream_configuration = models.StreamConfiguration( - name=stream_name, - ) - stream_configurations.append(stream_configuration) - - stream_configurations = models.StreamConfigurations(stream_configurations) - response = airbyte_instance.connections.create_connection( - models.ConnectionCreateRequest( - name=name, - source_id=source_id, - destination_id=destination_id, - configurations=stream_configurations, - prefix=prefix, - ), - ) - if not status_ok(response.status_code): - raise AirbyteError( - context={ - "source_id": source_id, - "destination_id": destination_id, - "response": response, - }, - ) - - return response.connection_response - - -def get_connection_by_name( - workspace_id: str, - connection_name: str, - *, - api_root: str, - api_key: str, -) -> models.ConnectionResponse: - """Get a connection.""" - connections = list_connections( - workspace_id=workspace_id, - api_key=api_key, - api_root=api_root, - ) - found: list[models.ConnectionResponse] = [ - connection for connection in connections if connection.name == connection_name - ] - if len(found) == 0: - raise AirbyteMissingResourceError( - connection_name, "connection", f"Workspace: {workspace_id}" - ) - - if len(found) > 1: - raise AirbyteMultipleResourcesError( - resource_type="connection", - resource_name_or_id=connection_name, - context={ - "workspace_id": workspace_id, - "multiples": found, - }, - ) - - return found[0] - - -def delete_connection( - connection_id: str, - api_root: str, - workspace_id: str, - api_key: str, -) -> None: - _ = workspace_id # Not used (yet) - airbyte_instance = get_airbyte_server_instance( - api_key=api_key, - api_root=api_root, - ) - response = airbyte_instance.connections.delete_connection( - api.DeleteConnectionRequest( - connection_id=connection_id, - ), - ) - if not status_ok(response.status_code): - raise AirbyteError( - context={ - "connection_id": connection_id, - "response": response, - }, - ) - - -# Not yet implemented - - -# def check_source( -# source_id: str, -# *, -# api_root: str, -# api_key: str, -# workspace_id: str | None = None, -# ) -> api.SourceCheckResponse: -# """Check a source. - -# # TODO: Need to use legacy Configuration API for this: -# # https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/sources/check_connection -# """ -# _ = source_id, workspace_id, api_root, api_key -# raise NotImplementedError diff --git a/airbyte/_util/iter.py b/airbyte/_util/iter.py new file mode 100644 index 00000000..f14ed43c --- /dev/null +++ b/airbyte/_util/iter.py @@ -0,0 +1,81 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Utility functions for working with iterables.""" + +from __future__ import annotations + +from collections.abc import Iterable +from typing import TYPE_CHECKING, TypeVar + +from airbyte import exceptions as exc + + +if TYPE_CHECKING: + from collections.abc import Iterable, Iterator + + +T = TypeVar("T") + + +class EmptyIterableError(ValueError): + """An error raised when an iterable is unexpectedly empty.""" + + +class MultipleMatchesError(ValueError): + """An error raised when an iterable unexpectedly has more than one item.""" + + +def exactly_one( + iterable: Iterable[T], +) -> T: + """Return the only item in an iterable. + + If there is not exactly one item, raises `EmptyIterableError` or `MultipleMatchesError`, both + of which are subclasses of `ValueError`. + """ + it: Iterator[T] = iter(iterable) + try: + result: T = next(it) + except StopIteration: + raise EmptyIterableError from None + + try: + next(it) + except StopIteration: + return result + + raise MultipleMatchesError + + +def exactly_one_resource( + iterable: Iterable[T], +) -> T: + """Return the only item in an iterable. + + If there is not exactly one item, raises either `AirbyteMissingResourceError` or + `AirbyteMultipleResourcesError`. + """ + try: + return exactly_one(iterable) + except EmptyIterableError: + raise exc.AirbyteMissingResourceError( + message="Expected exactly one resource, but found none.", + ) from None + except MultipleMatchesError: + raise exc.AirbyteMultipleResourcesError( + message="Expected exactly one resource, but found multiple.", + ) from None + + +def no_existing_resources( + iterable: Iterable[T], +) -> None: + """Raise an error if any resource exists in an iterable. + + Raises `AirbyteResourceAlreadyExistsError` if any resource exists in the iterable. + """ + try: + exactly_one(iterable) + except EmptyIterableError: + return + + raise exc.AirbyteResourceAlreadyExistsError diff --git a/airbyte/cloud/__init__.py b/airbyte/cloud/__init__.py index b599386d..6a93dcbd 100644 --- a/airbyte/cloud/__init__.py +++ b/airbyte/cloud/__init__.py @@ -56,8 +56,9 @@ from __future__ import annotations -from airbyte.cloud import connections, constants, sync_results, workspaces +from airbyte.cloud import connections, connectors, constants, sync_results, workspaces from airbyte.cloud.connections import CloudConnection +from airbyte.cloud.connectors import CloudConnector from airbyte.cloud.constants import JobStatusEnum from airbyte.cloud.sync_results import SyncResult from airbyte.cloud.workspaces import CloudWorkspace @@ -65,13 +66,15 @@ __all__ = [ # Submodules - "workspaces", "connections", + "connectors", "constants", "sync_results", + "workspaces", # Classes - "CloudWorkspace", "CloudConnection", + "CloudConnector", + "CloudWorkspace", "SyncResult", # Enums "JobStatusEnum", diff --git a/airbyte/cloud/_api_util.py b/airbyte/cloud/_api_util.py new file mode 100644 index 00000000..051eb952 --- /dev/null +++ b/airbyte/cloud/_api_util.py @@ -0,0 +1,859 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +"""These internal functions are used to interact with the Airbyte API (module named `airbyte`). + +In order to insulate users from breaking changes and to avoid general confusion around naming +and design inconsistencies, we do not expose these functions or other Airbyte API classes within +PyAirbyte. Classes and functions from the Airbyte API external library should always be wrapped in +PyAirbyte classes - unless there's a very compelling reason to surface these models intentionally. + +Similarly, modules outside of this file should try to avoid interfacing with `airbyte_api` library +directly. This will ensure a single source of truth when mapping between the `airbyte` and +`airbyte_api` libraries. +""" + +from __future__ import annotations + +import json +from enum import Enum +from typing import TYPE_CHECKING, Any, Callable + +import airbyte_api +import requests +from airbyte_api import api, models + +from airbyte import exceptions as exc +from airbyte._util.iter import exactly_one_resource +from airbyte.exceptions import ( + AirbyteConnectionSyncError, + AirbyteError, + AirbyteMissingResourceError, + PyAirbyteInternalError, +) + + +if TYPE_CHECKING: + from collections.abc import Iterator + + from airbyte.cloud.constants import ConnectorTypeEnum + + +JOB_WAIT_INTERVAL_SECS = 2.0 +JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour +CLOUD_API_ROOT = "https://api.airbyte.com/v1" +LEGACY_API_ROOT = "https://cloud.airbyte.com/api/v1" +DEFAULT_PAGE_SIZE = 30 + +# Helper functions + + +class ResourceTypeEnum(str, Enum): + """Resource types.""" + + CONNECTION = "connection" + SOURCE = "source" + DESTINATION = "destination" + JOB = "job" + WORKSPACE = "workspace" + + +def status_ok(status_code: int) -> bool: + """Check if a status code is OK.""" + return status_code >= 200 and status_code < 300 # noqa: PLR2004 # allow inline magic numbers + + +def get_airbyte_server_instance( + *, + api_key: str, + api_root: str, +) -> airbyte_api.Airbyte: + """Get an Airbyte instance.""" + return airbyte_api.AirbyteAPI( + security=models.Security( + bearer_auth=api_key, + ), + server_url=api_root, + ) + + +# Fetch Resource (Generic) + + +def fetch_resource_info( + resource_id: str, + resource_type: ResourceTypeEnum, + *, + api_root: str, + api_key: str, +) -> ( + None + | models.WorkspaceResponse + | models.ConnectionResponse + | models.SourceResponse + | models.DestinationResponse + | models.JobResponse +): + """Get a resource.""" + airbyte_instance: airbyte_api.AirbyteAPI = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, + ) + if resource_type == ResourceTypeEnum.CONNECTION: + raw_response = airbyte_instance.connections.get_connection( + api.GetConnectionRequest( + connection_id=resource_id, + ), + ) + if status_ok(raw_response.status_code) and raw_response.connection_response: + return raw_response.connection_response + + elif resource_type == ResourceTypeEnum.SOURCE: + raw_response = airbyte_instance.sources.get_source( + api.GetSourceRequest( + source_id=resource_id, + ), + ) + if status_ok(raw_response.status_code) and raw_response.source_response: + return raw_response.source_response + + elif resource_type == ResourceTypeEnum.DESTINATION: + raw_response = airbyte_instance.destinations.get_destination( + api.GetDestinationRequest( + destination_id=resource_id, + ), + ) + if status_ok(raw_response.status_code) and raw_response.destination_response: + # TODO: Remove this "fix" once the Airbyte API library is fixed. + raw_response = _fix_destination_info(raw_response) + return raw_response.destination_response + + elif resource_type == ResourceTypeEnum.JOB: + raw_response = airbyte_instance.jobs.get_job( + api.GetJobRequest( + job_id=resource_id, + ), + ) + if status_ok(raw_response.status_code) and raw_response.job_response: + return raw_response.job_response + + elif resource_type == ResourceTypeEnum.WORKSPACE: + raw_response = airbyte_instance.workspaces.get_workspace( + api.GetWorkspaceRequest( + workspace_id=resource_id, + ), + ) + if status_ok(raw_response.status_code) and raw_response.workspace_response: + return raw_response.workspace_response + + else: + raise PyAirbyteInternalError( + message="Invalid resource type.", + context={ + "resource_type": resource_type, + }, + ) + + # If we reach this point, the resource was not found. + raise AirbyteMissingResourceError( + response=raw_response, + resource_name_or_id=resource_id, + resource_type=resource_type.value, + log_text=raw_response.text, + ) + + +def fetch_workspace_info( + workspace_id: str, + *, + api_root: str, + api_key: str, +) -> models.WorkspaceResponse: + """Get a connection.""" + return fetch_resource_info( + resource_id=workspace_id, + resource_type=ResourceTypeEnum.WORKSPACE, + api_key=api_key, + api_root=api_root, + ) + + +def fetch_connection_info( + connection_id: str, + *, + api_root: str, + api_key: str, +) -> models.ConnectionResponse: + """Get a connection.""" + return fetch_resource_info( + resource_id=connection_id, + resource_type=ResourceTypeEnum.CONNECTION, + api_key=api_key, + api_root=api_root, + ) + + +def fetch_source_info( + source_id: str, + *, + api_root: str, + api_key: str, +) -> models.SourceResponse: + """Get a source.""" + return fetch_resource_info( + resource_id=source_id, + resource_type=ResourceTypeEnum.SOURCE, + api_key=api_key, + api_root=api_root, + ) + + +def fetch_destination_info( + destination_id: str, + *, + api_root: str, + api_key: str, +) -> models.DestinationResponse: + """Get a destination.""" + return fetch_resource_info( + resource_id=destination_id, + resource_type=ResourceTypeEnum.DESTINATION, + api_key=api_key, + api_root=api_root, + ) + + +def _fix_destination_info( + response: models.DestinationResponse, +) -> models.DestinationResponse: + if status_ok(response.status_code): + # TODO: This is a temporary workaround to resolve an issue where + # the destination API response is of the wrong type. + raw_response: dict[str, Any] = json.loads(response.raw_response.text) + raw_configuration: dict[str, Any] = raw_response["configuration"] + destination_type = raw_response.get("destinationType") + if destination_type == "snowflake": + response.destination_response.configuration = models.DestinationSnowflake.from_dict( + raw_configuration, + ) + if destination_type == "bigquery": + response.destination_response.configuration = models.DestinationBigquery.from_dict( + raw_configuration, + ) + if destination_type == "postgres": + response.destination_response.configuration = models.DestinationPostgres.from_dict( + raw_configuration, + ) + if destination_type == "duckdb": + response.destination_response.configuration = models.DestinationDuckdb.from_dict( + raw_configuration, + ) + + return response + + raise NotImplementedError # TODO: Replace with a custom exception for this case. + + +def fetch_job_info( + job_id: str, + *, + api_root: str, + api_key: str, +) -> models.JobResponse: + """Get a job.""" + return fetch_resource_info( + resource_id=job_id, + resource_type=ResourceTypeEnum.JOB, + api_key=api_key, + api_root=api_root, + ) + + +# List connections, sources, and destinations + + +def list_resources( + resource_type: ResourceTypeEnum, + *, + workspace_id: str, + parent_resource_id: str | None = None, + name_filter: str | Callable[[str], bool] | None = None, + api_root: str, + api_key: str, + page_size: int, + limit: int | None, +) -> Iterator[ + api.ListConnectionsResponse + | api.ListSourcesResponse + | api.ListDestinationsResponse + | api.ListJobsResponse +]: + """Get a connection. + + If name_filter is a string, only connections containing that name will be returned. If + name_filter is a function, it will be called with the connection name and should return a + boolean. + """ + airbyte_instance = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, + ) + offset = 0 + returned_count = 0 + + if isinstance(name_filter, str): + # Redefine name_filter as a function + + def resource_filter_fn(resource: Any) -> bool: # noqa: ANN401 # Intentional use of Any + return name_filter in resource.name + elif name_filter is None: + # "Always True" filter + + def resource_filter_fn(resource: Any) -> bool: # noqa: ANN401 # Intentional use of Any + _ = resource + return True + else: + + def resource_filter_fn(resource: Any) -> bool: # noqa: ANN401 # Intentional use of Any + return name_filter(resource.name) + + if resource_type == ResourceTypeEnum.CONNECTION: + list_function = airbyte_instance.connections.list_connections + request = api.ListConnectionsRequest( + workspace_ids=[workspace_id], + include_deleted=False, + limit=page_size, + offset=offset, + ) + + def get_resources_from_response( + response: api.ListConnectionsResponse, + ) -> list[models.ConnectionResponse]: + return response.connections_response.data + + elif resource_type == ResourceTypeEnum.SOURCE: + list_function = airbyte_instance.sources.list_sources + request = api.ListSourcesRequest( + workspace_ids=[workspace_id], + limit=page_size, + offset=offset, + ) + + def get_resources_from_response( + response: api.ListSourcesResponse, + ) -> list[api.SourceResponse]: + return response.sources_response.data + + elif resource_type == ResourceTypeEnum.DESTINATION: + list_function = airbyte_instance.destinations.list_destinations + request = api.ListDestinationsRequest( + workspace_id=[workspace_id], + limit=page_size, + offset=offset, + ) + + def get_resources_from_response( + response: api.ListConnectionsResponse, + ) -> list[models.ConnectionResponse]: + return response.connections_response.data + + elif resource_type == ResourceTypeEnum.JOB: + list_function = airbyte_instance.jobs.list_jobs + request = api.ListJobsRequest( + workspace_ids=[workspace_id], + connection_id=parent_resource_id, + limit=page_size, + offset=offset, + ) + + def get_resources_from_response( + response: api.ListJobsResponse, + ) -> list[api.JobResponse]: + return response.jobs_response.data + + else: + raise PyAirbyteInternalError( + message="Invalid resource type.", + context={ + "resource_type": resource_type, + }, + ) + + while limit is None or returned_count < limit: + response = list_function(request) + + if not status_ok(response.status_code): + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) + + resources = get_resources_from_response(response) + if not resources: + # No more resources to list + break + + for resource in resources: + if resource_filter_fn(resource): + yield resource + + returned_count += 1 + if limit is not None and returned_count >= limit: + break + + offset += page_size + + # Finished paging + return + + +def list_sources( + workspace_id: str, + *, + name_filter: str | Callable[[str], bool] | None = None, + api_root: str, + api_key: str, + limit: int | None = None, +) -> Iterator[api.SourceResponse]: + """List sources.""" + return list_resources( + ResourceTypeEnum.SOURCE, + workspace_id=workspace_id, + name_filter=name_filter, + limit=limit, + api_key=api_key, + api_root=api_root, + page_size=DEFAULT_PAGE_SIZE, + ) + + +def list_destinations( + workspace_id: str, + *, + name_filter: str | Callable[[str], bool] | None = None, + api_root: str, + api_key: str, + limit: int | None = None, +) -> Iterator[api.DestinationResponse]: + """List destinations.""" + return list_resources( + ResourceTypeEnum.DESTINATION, + workspace_id=workspace_id, + name_filter=name_filter, + limit=limit, + api_key=api_key, + api_root=api_root, + page_size=DEFAULT_PAGE_SIZE, + ) + + +def list_connections( + workspace_id: str, + *, + name_filter: str | Callable[[str], bool] | None = None, + api_root: str, + api_key: str, + limit: int | None = None, +) -> Iterator[models.ConnectionResponse]: + """List connections.""" + return list_resources( + ResourceTypeEnum.CONNECTION, + workspace_id=workspace_id, + name_filter=name_filter, + limit=limit, + page_size=DEFAULT_PAGE_SIZE, + api_key=api_key, + api_root=api_root, + ) + + +def list_jobs( + workspace_id: str, + connection_id: str, + *, + api_root: str, + api_key: str, + limit: int | None = 30, +) -> Iterator[api.JobResponse]: + """List jobs.""" + return list_resources( + ResourceTypeEnum.JOB, + workspace_id=workspace_id, + parent_resource_id=connection_id, + limit=limit, + page_size=DEFAULT_PAGE_SIZE, + api_key=api_key, + api_root=api_root, + ) + + +# Get resources by name + + +def fetch_connection_by_name( + workspace_id: str, + connection_name: str, + *, + api_root: str, + api_key: str, +) -> models.ConnectionResponse: + """Get a connection by name. + + Raises `AirbyteMissingResourceError` if the connection is not found or + `AirbyteMultipleResourcesError` if multiple connections are found. + """ + return exactly_one_resource( + list_connections( + workspace_id=workspace_id, + name_filter=connection_name, + api_key=api_key, + api_root=api_root, + limit=2, + ) + ) + + +def fetch_source_by_name( + workspace_id: str, + source_name: str, + *, + api_root: str, + api_key: str, +) -> models.SourceResponse: + """Get a source by name. + + Raises `AirbyteMissingResourceError` if the source is not found or + `AirbyteMultipleResourcesError` if multiple sources are found. + """ + return exactly_one_resource( + list_sources( + workspace_id=workspace_id, + name_filter=source_name, + api_key=api_key, + api_root=api_root, + limit=2, + ) + ) + + +def fetch_destination_by_name( + workspace_id: str, + destination_name: str, + *, + api_root: str, + api_key: str, +) -> models.DestinationResponse: + """Get a destination by name. + + Raises `AirbyteMissingResourceError` if the destination is not found or + `AirbyteMultipleResourcesError` if multiple destinations are found. + """ + return exactly_one_resource( + list_sources( + workspace_id=workspace_id, + name_filter=destination_name, + api_key=api_key, + api_root=api_root, + limit=2, + ) + ) + + +# Run connections + + +def run_connection( + workspace_id: str, + connection_id: str, + *, + api_root: str, + api_key: str, +) -> models.JobResponse: + """Get a connection. + + If block is True, this will block until the connection is finished running. + + If raise_on_failure is True, this will raise an exception if the connection fails. + """ + _ = workspace_id # Not used (yet) + airbyte_instance = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, + ) + response = airbyte_instance.jobs.create_job( + models.JobCreateRequest( + connection_id=connection_id, + job_type=models.JobTypeEnum.SYNC, + ), + ) + if status_ok(response.status_code) and response.job_response: + return response.job_response + + raise AirbyteConnectionSyncError( + connection_id=connection_id, + context={ + "workspace_id": workspace_id, + }, + response=response, + ) + + +# Create and delete sources + + +def create_source( + name: str, + *, + workspace_id: str, + config: dict[str, Any], + api_root: str, + api_key: str, +) -> api.SourceResponse: + """Get a connection.""" + airbyte_instance = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, + ) + response: api.CreateSourceResponse = airbyte_instance.sources.create_source( + models.SourceCreateRequest( + name=name, + workspace_id=workspace_id, + configuration=config, # TODO: wrap in a proper configuration object + definition_id=None, # Not used alternative to config.sourceType. + secret_id=None, # For OAuth, not yet supported + ), + ) + if status_ok(response.status_code) and response.source_response: + return response.source_response + + raise AirbyteError( + message="Could not create source.", + response=response, + ) + + +def delete_source( + source_id: str, + *, + api_root: str, + api_key: str, + workspace_id: str | None = None, +) -> None: + """Delete a source.""" + _ = workspace_id # Not used (yet) + airbyte_instance = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, + ) + response = airbyte_instance.sources.delete_source( + api.DeleteSourceRequest( + source_id=source_id, + ), + ) + if not status_ok(response.status_code): + raise AirbyteError( + context={ + "source_id": source_id, + "response": response, + }, + ) + + +# Create and delete destinations + + +def create_destination( + name: str, + *, + workspace_id: str, + config: dict[str, Any], + api_root: str, + api_key: str, +) -> api.DestinationResponse: + """Get a connection.""" + airbyte_instance = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, + ) + response: api.CreateDestinationResponse = airbyte_instance.destinations.create_destination( + models.DestinationCreateRequest( + name=name, + workspace_id=workspace_id, + configuration=config, # TODO: wrap in a proper configuration object + ), + ) + if status_ok(response.status_code) and response.destination_response: + return response.destination_response + + raise AirbyteError( + message="Could not create destination.", + response=response, + ) + + +def delete_destination( + destination_id: str, + *, + api_root: str, + api_key: str, + workspace_id: str | None = None, +) -> None: + """Delete a destination.""" + _ = workspace_id # Not used (yet) + airbyte_instance = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, + ) + response = airbyte_instance.destinations.delete_destination( + api.DeleteDestinationRequest( + destination_id=destination_id, + ), + ) + if not status_ok(response.status_code): + raise AirbyteError( + context={ + "destination_id": destination_id, + "response": response, + }, + ) + + +# Create and delete connections + + +def create_connection( + name: str, + *, + source_id: str, + destination_id: str, + api_root: str, + api_key: str, + workspace_id: str | None = None, + prefix: str, + selected_stream_names: list[str] | None = None, +) -> models.ConnectionResponse: + _ = workspace_id # Not used (yet) + airbyte_instance = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, + ) + stream_configurations: models.StreamConfigurations | None = None + if selected_stream_names: + stream_configuration_list = [] + for stream_name in selected_stream_names: + stream_configuration = models.StreamConfiguration( + name=stream_name, + ) + stream_configuration_list.append(stream_configuration) + + stream_configurations = models.StreamConfigurations(stream_configuration_list) + + response = airbyte_instance.connections.create_connection( + models.ConnectionCreateRequest( + name=name, + source_id=source_id, + destination_id=destination_id, + configurations=stream_configurations, + prefix=prefix, + ), + ) + if not status_ok(response.status_code): + raise AirbyteError( + context={ + "source_id": source_id, + "destination_id": destination_id, + "response": response, + }, + ) + + return response.connection_response + + +def delete_connection( + connection_id: str, + api_root: str, + workspace_id: str, + api_key: str, +) -> None: + _ = workspace_id # Not used (yet) + airbyte_instance = get_airbyte_server_instance( + api_key=api_key, + api_root=api_root, + ) + response = airbyte_instance.connections.delete_connection( + api.DeleteConnectionRequest( + connection_id=connection_id, + ), + ) + if not status_ok(response.status_code): + raise AirbyteError( + context={ + "connection_id": connection_id, + "response": response, + }, + ) + + +# Legacy API Functions + + +def _transform_legacy_api_root(api_root: str) -> str: + """Transform the API root to the legacy API root if needed.""" + if api_root == CLOUD_API_ROOT: + # We know the user is using Airbyte Cloud, so we can safely return the legacy API root. + return LEGACY_API_ROOT + + # TODO: Figure out how to translate an OSS/Enterprise API root to the legacy Config API root. + raise NotImplementedError( + "Airbyte OSS and Enterprise are not currently supported for this operation." + ) + + +def check_connector_config( + connector_id: str, + connector_type: ConnectorTypeEnum, + workspace_id: str, + *, + api_key: str, + api_root: str, +) -> None: + """Check source or destination with its current config. + + Raises `AirbyteConnectorCheckFailedError` if the check fails. + + This calls the Config API because the Airbyte API does not support this operation. + + Equivalent to: + + ```bash + curl -X POST "https://cloud.airbyte.com/api/v1/sources/check_connection" \ + -H "accept: application/json"\ + -H "content-type: application/json" \ + -d '{"sourceId":"18efe99a-7400-4000-8d95-ca2cb0e7b401"}' + ``` + + API Docs: + https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/sources/check_connection + """ + legacy_api_root = _transform_legacy_api_root(api_root) + + _ = workspace_id # Not used + response: requests.Response = requests.post( + f"{legacy_api_root}/{connector_type.value}s/check_connection", + headers={ + "accept": "application/json", + "content-type": "application/json", + "Authorization": f"Bearer {api_key}", + }, + json={ + f"{connector_type.value}Id": connector_id, + }, + ) + response.raise_for_status() + + response_json = response.json() + if not response_json.get("status", None) == "succeeded": + raise exc.AirbyteConnectorCheckFailedError( + message=response_json.get("message", None), + context=response_json, + ) diff --git a/airbyte/cloud/_resources.py b/airbyte/cloud/_resources.py new file mode 100644 index 00000000..1582917e --- /dev/null +++ b/airbyte/cloud/_resources.py @@ -0,0 +1,55 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Resources for working with Airbyte Cloud in PyAirbyte.""" + +from __future__ import annotations + +from functools import wraps +from typing import Any, Callable, Protocol + + +AllowedAny = Any # When Any is allowed + + +# Define interface for generic resource info response +class ResourceInfoResponse(Protocol): + """An interface for resource info responses from the Airbyte Cloud API. + + This interface is used to define the expected structure of resource info responses + from the Airbyte Cloud API. + """ + + +# Decorator that makes sure the resource info is fetched before calling a method +def requires_fetch(func: Callable[..., Any]) -> Callable[..., Any]: + """A decorator that fetches the resource info before calling the decorated method. + + This decorator is used to ensure that the resource info is fetched before calling a method + that requires the resource info. + """ + + @wraps(func) + def wrapper( + self: ICloudResource, + *args: AllowedAny, + **kwargs: AllowedAny, + ) -> AllowedAny: + if not self._resource_info: + self._resource_info = self._fetch_resource_info() + + return func(self, *args, **kwargs) + + return wrapper + + +class ICloudResource(Protocol): + """A resource in Airbyte Cloud. + + You can use a resource object to retrieve information about the resource and manage the + resource. + """ + + _resource_info: ResourceInfoResponse | None + + def _fetch_resource_info(self) -> ResourceInfoResponse: + """Populate the resource with data from the API.""" + ... diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index 43156230..02708b59 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -3,97 +3,82 @@ from __future__ import annotations -from typing import TYPE_CHECKING, cast +from dataclasses import dataclass, field +from typing import TYPE_CHECKING -from airbyte._util import api_util +from airbyte_api.models.connectionresponse import ConnectionResponse + +from airbyte.cloud import _api_util +from airbyte.cloud._resources import ICloudResource from airbyte.cloud.sync_results import SyncResult if TYPE_CHECKING: + from collections.abc import Iterator + from airbyte_api.models import ConnectionResponse, JobResponse + from airbyte.cloud.connectors import CloudConnector from airbyte.cloud.workspaces import CloudWorkspace -class CloudConnection: +@dataclass +class CloudConnection(ICloudResource): """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. You can use a connection object to run sync jobs, retrieve logs, and manage the connection. """ - def __init__( - self, - workspace: CloudWorkspace, - connection_id: str, - source: str | None = None, - destination: str | None = None, - ) -> None: - """It is not recommended to create a `CloudConnection` object directly. + connection_id: str + """The ID of the connection.""" - Instead, use `CloudWorkspace.get_connection()` to create a connection object. - """ - self.connection_id = connection_id - """The ID of the connection.""" + workspace: CloudWorkspace + """The workspace that the connection belongs to.""" - self.workspace = workspace - """The workspace that the connection belongs to.""" + _source_id: str | None = None + """The ID of the source.""" - self._source_id = source - """The ID of the source.""" - - self._destination_id = destination - """The ID of the destination.""" + _destination_id: str | None = None + """The ID of the destination.""" - self._connection_info: ConnectionResponse | None = None + _resource_info: ConnectionResponse | None = field(default=None, init=False) + """The connection info for the connection. Internal use only.""" - def _fetch_connection_info(self) -> ConnectionResponse: + def _fetch_resource_info(self) -> ConnectionResponse: """Populate the connection with data from the API.""" - return api_util.get_connection( - workspace_id=self.workspace.workspace_id, + self._resource_info = _api_util.fetch_connection_info( connection_id=self.connection_id, api_root=self.workspace.api_root, api_key=self.workspace.api_key, ) + return self._resource_info # Properties @property def source_id(self) -> str: """The ID of the source.""" - if not self._source_id: - if not self._connection_info: - self._connection_info = self._fetch_connection_info() - - self._source_id = self._connection_info.source_id - - return cast(str, self._source_id) + return self._fetch_resource_info().source_id @property def destination_id(self) -> str: """The ID of the destination.""" - if not self._destination_id: - if not self._connection_info: - self._connection_info = self._fetch_connection_info() - - self._destination_id = self._connection_info.source_id + return self._fetch_resource_info().destination_id - return cast(str, self._destination_id) + @property + def destination(self) -> CloudConnector: + """The destination.""" + return self.workspace.get_destination(destination_id=self.destination_id) @property def stream_names(self) -> list[str]: """The stream names.""" - if not self._connection_info: - self._connection_info = self._fetch_connection_info() - - return [stream.name for stream in self._connection_info.configurations.streams] + return [stream.name for stream in self._fetch_resource_info().configurations.streams] @property def table_prefix(self) -> str: """The table prefix.""" - if not self._connection_info: - self._connection_info = self._fetch_connection_info() - - return self._connection_info.prefix + return self._fetch_resource_info().prefix @property def connection_url(self) -> str | None: @@ -112,7 +97,7 @@ def run_sync( wait_timeout: int = 300, ) -> SyncResult: """Run a sync.""" - connection_response = api_util.run_connection( + job_response: ConnectionResponse = _api_util.run_connection( connection_id=self.connection_id, api_root=self.workspace.api_root, api_key=self.workspace.api_key, @@ -121,7 +106,8 @@ def run_sync( sync_result = SyncResult( workspace=self.workspace, connection=self, - job_id=connection_response.job_id, + job_id=job_response.job_id, + table_name_prefix=self.table_prefix, ) if wait: @@ -141,7 +127,7 @@ def get_previous_sync_logs( limit: int = 10, ) -> list[SyncResult]: """Get the previous sync logs for a connection.""" - sync_logs: list[JobResponse] = api_util.get_job_logs( + sync_logs: Iterator[JobResponse] = _api_util.list_jobs( connection_id=self.connection_id, api_root=self.workspace.api_root, api_key=self.workspace.api_key, @@ -153,7 +139,8 @@ def get_previous_sync_logs( workspace=self.workspace, connection=self, job_id=sync_log.job_id, - _latest_job_info=sync_log, + _resource_info=sync_log, + table_name_prefix=self.table_prefix, ) for sync_log in sync_logs ] @@ -170,7 +157,7 @@ def get_sync_result( """ if job_id is None: # Get the most recent sync job - results = self.get_previous_sync_logs( + results: list[SyncResult] = self.get_previous_sync_logs( limit=1, ) if results: @@ -183,11 +170,12 @@ def get_sync_result( workspace=self.workspace, connection=self, job_id=job_id, + table_name_prefix=self.table_prefix, ) # Deletions - def _permanently_delete( + def permanently_delete_connection( self, *, delete_source: bool = False, @@ -199,16 +187,10 @@ def _permanently_delete( delete_source: Whether to also delete the source. delete_destination: Whether to also delete the destination. """ - self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now) - connection=self - ) + self.workspace.permanently_delete_connection(connection=self) if delete_source: - self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now) - source=self.source_id - ) + self.workspace.permanently_delete_source(source=self.source_id) if delete_destination: - self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API - destination=self.destination_id, - ) + self.workspace.permanently_delete_destination(destination=self.destination_id) diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py new file mode 100644 index 00000000..d7e1654f --- /dev/null +++ b/airbyte/cloud/connectors.py @@ -0,0 +1,132 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Cloud Connectors.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from airbyte import exceptions as exc +from airbyte.cloud import _api_util +from airbyte.cloud._resources import ICloudResource, requires_fetch +from airbyte.cloud.constants import ConnectorTypeEnum +from airbyte.exceptions import PyAirbyteInputError + + +if TYPE_CHECKING: + from airbyte._util.api_imports import DestinationResponse, SourceResponse + from airbyte.cloud.workspaces import CloudWorkspace + + +@dataclass +class CloudConnector(ICloudResource): + """A connector is a source or destination in Airbyte Cloud. + + You can use a connector object to retrieve information about the connector and manage the + connector. + """ + + workspace: CloudWorkspace + """The workspace that the connector belongs to.""" + + connector_id: str + """The ID of the connector.""" + + connector_type: ConnectorTypeEnum + """The type of the connector.""" + + _resource_info: SourceResponse | DestinationResponse | None = field(default=None, init=False) + """The connector info for the connector. Internal use only.""" + + @property + @requires_fetch + def name(self) -> str: + """Return the name of the connector.""" + assert self._resource_info, "Decorator should have fetched the resource info." + return self._resource_info.name + + def _fetch_resource_info(self) -> SourceResponse | DestinationResponse: + """Populate the connector with data from the API.""" + if self._resource_info: + return self._resource_info + + if self.is_source: + self._resource_info = _api_util.fetch_source_info( + source_id=self.source_id, + api_root=self.workspace.api_root, + api_key=self.workspace.api_key, + ) + return self._resource_info + + if self.is_destination: + self._resource_info = _api_util.fetch_destination_info( + destination_id=self.destination_id, + api_root=self.workspace.api_root, + api_key=self.workspace.api_key, + ) + return self._resource_info + + raise exc.PyAirbyteInternalError( + message=f"Connector {self.name} is not a source or destination." + ) + + @property + def is_source(self) -> bool: + """Return true if the connector is a source.""" + return self.connector_type is ConnectorTypeEnum.SOURCE + + @property + def is_destination(self) -> bool: + """Return true if the connector is a destination.""" + return self.connector_type is ConnectorTypeEnum.DESTINATION + + def _assert_is_source(self) -> None: + """Raise an error if the connector is not a source.""" + if not self.is_source: + raise PyAirbyteInputError(message=f"Connector {self.name} is not a source.") + + def _assert_is_destination(self) -> None: + """Raise an error if the connector is not a destination.""" + if not self.is_destination: + raise PyAirbyteInputError(message=f"Connector {self.name} is not a destination.") + + @property + def source_id(self) -> str: + """Return the ID of the source if this is a source. Otherwise, raise an error. + + Raises: + PyAirbyteInputError: If the connector is not a source. + """ + self._assert_is_source() + + return self.connector_id + + @property + def destination_id(self) -> str: + """Return the ID of the destination if this is a destination. Otherwise, raise an error. + + Raises: + PyAirbyteInputError: If the connector is not a destination. + """ + self._assert_is_destination() + + return self.connector_id + + @classmethod + def from_connector_id( + cls, + workspace: CloudWorkspace, + connector_id: str, + connector_type: ConnectorTypeEnum, + ) -> CloudConnector: + """Create a connector object from a connection ID.""" + return cls( + workspace=workspace, + connector_id=connector_id, + connector_type=connector_type, + ) + + @property + def configuration(self) -> dict: + """Return the configuration of the connector.""" + return self._fetch_resource_info().configuration diff --git a/airbyte/cloud/constants.py b/airbyte/cloud/constants.py index de40985c..cc18749a 100644 --- a/airbyte/cloud/constants.py +++ b/airbyte/cloud/constants.py @@ -3,7 +3,7 @@ from __future__ import annotations -from airbyte._util.api_imports import JobStatusEnum +from airbyte._util.api_imports import ConnectorTypeEnum, JobStatusEnum FINAL_STATUSES: set[JobStatusEnum] = { @@ -24,3 +24,12 @@ "snowflake", } """List of Airbyte Cloud destinations that PyAirbyte is able to read from.""" + + +__all__: list[str] = [ + "FINAL_STATUSES", + "FAILED_STATUSES", + "READABLE_DESTINATION_TYPES", + "JobStatusEnum", + "ConnectorTypeEnum", +] diff --git a/airbyte/cloud/experimental.py b/airbyte/cloud/experimental.py index fbc3ace4..28b8b2ad 100644 --- a/airbyte/cloud/experimental.py +++ b/airbyte/cloud/experimental.py @@ -34,26 +34,11 @@ # This module is not imported anywhere by default, so this warning should only print if the user # explicitly imports it. warnings.warn( - message="The `airbyte.cloud.experimental` module is experimental and may change in the future.", - category=FutureWarning, + message="The `airbyte.cloud.experimental` module is deprecated. All features are finalized.", + category=DeprecationWarning, stacklevel=2, ) - -class CloudWorkspace(Stable_CloudWorkspace): - __doc__ = ( - f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}" - ) - deploy_connection = Stable_CloudWorkspace._deploy_connection - deploy_source = Stable_CloudWorkspace._deploy_source - deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination - permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection - permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source - permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination - - -class CloudConnection(Stable_CloudConnection): - __doc__ = ( - f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}" - ) - permanently_delete = Stable_CloudConnection._permanently_delete +# All experimental methods are now stable. +CloudConnection = Stable_CloudConnection +CloudWorkspace = Stable_CloudWorkspace diff --git a/airbyte/cloud/sync_results.py b/airbyte/cloud/sync_results.py index 997566e7..1d6ace36 100644 --- a/airbyte/cloud/sync_results.py +++ b/airbyte/cloud/sync_results.py @@ -102,12 +102,13 @@ import time from collections.abc import Iterator, Mapping -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from typing import TYPE_CHECKING, Any, final -from airbyte._util import api_util +from airbyte.cloud import _api_util from airbyte.cloud._destination_util import create_cache_from_destination_config +from airbyte.cloud._resources import ICloudResource from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES from airbyte.datasets import CachedDataset from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError @@ -119,92 +120,69 @@ if TYPE_CHECKING: import sqlalchemy - from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum + from airbyte._util.api_imports import JobResponse, JobStatusEnum from airbyte.caches.base import CacheBase from airbyte.cloud.connections import CloudConnection + from airbyte.cloud.connectors import CloudConnector from airbyte.cloud.workspaces import CloudWorkspace @dataclass -class SyncResult: +class SyncResult(ICloudResource): """The result of a sync operation. **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by interacting with the `.CloudWorkspace` and `.CloudConnection` objects. """ - workspace: CloudWorkspace - connection: CloudConnection - job_id: str - table_name_prefix: str = "" - table_name_suffix: str = "" - _latest_job_info: JobResponse | None = None - _connection_response: ConnectionResponse | None = None + workspace: CloudWorkspace = field() + connection: CloudConnection = field() + job_id: str = field() + table_name_prefix: str = field() + _resource_info: JobResponse | None = field(default=None) _cache: CacheBase | None = None + _destination: CloudConnector | None = None @property def job_url(self) -> str: """Return the URL of the sync job.""" return f"{self.connection.job_history_url}/{self.job_id}" - def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: - """Return connection info for the sync job.""" - if self._connection_response and not force_refresh: - return self._connection_response - - self._connection_response = api_util.get_connection( - workspace_id=self.workspace.workspace_id, - api_root=self.workspace.api_root, - api_key=self.workspace.api_key, - connection_id=self.connection.connection_id, - ) - return self._connection_response - - def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: - """Return the destination configuration for the sync job.""" - connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) - destination_response = api_util.get_destination( - destination_id=connection_info.destination_id, - api_root=self.workspace.api_root, - api_key=self.workspace.api_key, - ) - return destination_response.configuration - def is_job_complete(self) -> bool: """Check if the sync job is complete.""" return self.get_job_status() in FINAL_STATUSES def get_job_status(self) -> JobStatusEnum: """Check if the sync job is still running.""" - return self._fetch_latest_job_info().status + return self._fetch_resource_info().status - def _fetch_latest_job_info(self) -> JobResponse: + def _fetch_resource_info(self) -> JobResponse: """Return the job info for the sync job.""" - if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: - return self._latest_job_info + if self._resource_info and self._resource_info.status in FINAL_STATUSES: + return self._resource_info - self._latest_job_info = api_util.get_job_info( + self._resource_info = _api_util.fetch_job_info( job_id=self.job_id, api_root=self.workspace.api_root, api_key=self.workspace.api_key, ) - return self._latest_job_info + return self._resource_info @property def bytes_synced(self) -> int: """Return the number of records processed.""" - return self._fetch_latest_job_info().bytes_synced + return self._fetch_resource_info().bytes_synced @property def records_synced(self) -> int: """Return the number of records processed.""" - return self._fetch_latest_job_info().rows_synced + return self._fetch_resource_info().rows_synced @property def start_time(self) -> datetime: """Return the start time of the sync job in UTC.""" # Parse from ISO 8601 format: - return datetime.fromisoformat(self._fetch_latest_job_info().start_time) + return datetime.fromisoformat(self._fetch_resource_info().start_time) def raise_failure_status( self, @@ -219,8 +197,8 @@ def raise_failure_status( Otherwise, do nothing. """ - if not refresh_status and self._latest_job_info: - latest_status = self._latest_job_info.status + if not refresh_status and self._resource_info: + latest_status = self._resource_info.status else: latest_status = self.get_job_status() @@ -262,14 +240,14 @@ def wait_for_completion( return latest_status # This will be a non-final status - time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) + time.sleep(_api_util.JOB_WAIT_INTERVAL_SECS) def get_sql_cache(self) -> CacheBase: """Return a SQL Cache object for working with the data in a SQL-based destination's.""" if self._cache: return self._cache - destination_configuration: dict[str, Any] = self._get_destination_configuration() + destination_configuration: dict[str, Any] = self.connection.destination.configuration self._cache = create_cache_from_destination_config( destination_configuration=destination_configuration ) diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index b9bb6a2f..0373426f 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -7,33 +7,180 @@ from __future__ import annotations +import warnings +from contextlib import suppress from dataclasses import dataclass -from typing import TYPE_CHECKING +from functools import wraps +from typing import TYPE_CHECKING, Any, Callable from airbyte import exceptions as exc -from airbyte._util.api_util import ( - CLOUD_API_ROOT, - create_connection, - create_destination, - create_source, - delete_connection, - delete_destination, - delete_source, - get_workspace, -) +from airbyte._util import iter as iter_util +from airbyte.cloud import _api_util from airbyte.cloud._destination_util import get_destination_config_from_cache +from airbyte.cloud._resources import ICloudResource from airbyte.cloud.connections import CloudConnection +from airbyte.cloud.connectors import CloudConnector +from airbyte.cloud.constants import ConnectorTypeEnum from airbyte.cloud.sync_results import SyncResult -from airbyte.sources.base import Source if TYPE_CHECKING: from airbyte._util.api_imports import DestinationResponse from airbyte.caches.base import CacheBase + from airbyte.sources.base import Source + + +# Decorator for resolving connection objects +def resolve_connection(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper( + self: CloudWorkspace, + *args: Any, # noqa: ANN401 + connection: str | CloudConnection | None = None, + connection_id: str | None = None, + **kwargs: Any, # noqa: ANN401 + ) -> Any: # noqa: ANN401 + if connection_id is not None: + warnings.warn( + message="The `connection_id` parameter is deprecated. Use `connection` instead.", + category=DeprecationWarning, + stacklevel=2, + ) + connection = connection_id + + if isinstance(connection, str): + connection = CloudConnection( + workspace=self, + connection_id=connection, + ) + + return func(self, *args, connection=connection, **kwargs) + + return wrapper + + +# Decorator for resolving source objects +def resolve_source(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper( + self: CloudWorkspace, + *args: Any, # noqa: ANN401 + source: str | CloudConnector | None = None, + source_id: str | None = None, + **kwargs: Any, # noqa: ANN401 + ) -> Any: # noqa: ANN401 + if source_id is not None: + warnings.warn( + "The `source_id` parameter is deprecated. Use the `source` parameter instead.", + DeprecationWarning, + stacklevel=2, + ) + source = source_id + + if isinstance(source, str): + source = CloudConnector( + workspace=self, + connector_id=source, + connector_type=ConnectorTypeEnum.SOURCE, + ) + + return func(self, *args, source=source, **kwargs) + + return wrapper + + +# Decorator for resolving source IDs from objects +def resolve_source_id(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper( + self: CloudWorkspace, + *args: Any, # noqa: ANN401 + source: str | CloudConnector | None = None, + source_id: str | None = None, + **kwargs: Any, # noqa: ANN401 + ) -> Any: # noqa: ANN401 + if not isinstance(source, (str, CloudConnector)): + raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 + + if source_id is not None: + warnings.warn( + "The `source_id` parameter is deprecated. Use the `source` parameter instead.", + DeprecationWarning, + stacklevel=2, + ) + source = source_id + + if isinstance(source, CloudConnector): + source = source.source_id + + return func(self, *args, source=source, **kwargs) + + return wrapper + + +# Decorator for resolving destination objects +def resolve_destination(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper( + self: CloudWorkspace, + *args: Any, # noqa: ANN401 + destination: str | CloudConnector | None = None, + destination_id: str | None = None, + **kwargs: Any, # noqa: ANN401 + ) -> Any: # noqa: ANN401 + if destination_id is not None: + warnings.warn( + "The `destination_id` parameter is deprecated. Use `destination` instead.", + DeprecationWarning, + stacklevel=2, + ) + destination = destination_id + + if isinstance(destination, str): + destination = CloudConnector( + workspace=self, + connector_id=destination, + connector_type=ConnectorTypeEnum.DESTINATION, + ) + + return func(self, *args, destination=destination, **kwargs) + + return wrapper + + +# Decorator for resolving destination IDs from objects +def resolve_destination_id(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper( + self: CloudWorkspace, + *args: Any, # noqa: ANN401 + destination: str | CloudConnector | None = None, + destination_id: str | None = None, + **kwargs: Any, # noqa: ANN401 + ) -> Any: # noqa: ANN401 + if destination is None and destination_id is None: + raise exc.PyAirbyteInputError( + message="No destination or destination ID provided.", + ) + + if destination_id is not None: + warnings.warn( + "The `destination_id` parameter is deprecated. Use `destination` instead.", + DeprecationWarning, + stacklevel=2, + ) + destination = destination_id + + if isinstance(destination, CloudConnector): + destination = destination.connector_id + + return func(self, *args, destination=destination, **kwargs) + + return wrapper @dataclass -class CloudWorkspace: +class CloudWorkspace(ICloudResource): """A remote workspace on the Airbyte Cloud. By overriding `api_root`, you can use this class to interact with self-managed Airbyte @@ -42,7 +189,7 @@ class CloudWorkspace: workspace_id: str api_key: str - api_root: str = CLOUD_API_ROOT + api_root: str = _api_util.CLOUD_API_ROOT @property def workspace_url(self) -> str | None: @@ -57,83 +204,115 @@ def connect(self) -> None: serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct. """ - _ = get_workspace( + _ = _api_util.fetch_workspace_info( + workspace_id=self.workspace_id, api_root=self.api_root, api_key=self.api_key, - workspace_id=self.workspace_id, ) - print(f"Successfully connected to workspace: {self.workspace_url}") + + print("Successfully connected to workspace.") # Deploy and delete sources - # TODO: Make this a public API - def _deploy_source( + def deploy_source( self, source: Source, - ) -> str: + name: str, + ) -> CloudConnector: """Deploy a source to the workspace. - Returns the newly deployed source ID. + This method will deploy a source to the workspace and return the `CloudConnector` object of + the deployed source. + + Args: + name (str): The key to use for the source name. This is used to provide + idempotency when deploying the same source multiple times. If `None`, then + `source_id` is required. If a matching source source is found and `update_existing` + is `False`, then a `AirbyteResourceAlreadyExists` exception will be raised. + source (Source): The source to deploy. """ - source_configuration = source.get_config().copy() + source_configuration: dict[str, Any] = source.get_config().copy() source_configuration["sourceType"] = source.name.replace("source-", "") - deployed_source = create_source( - name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)", + iter_util.no_existing_resources( + _api_util.list_sources( + workspace_id=self.workspace_id, + name_filter=name, + api_root=self.api_root, + api_key=self.api_key, + limit=1, + ), + ) + + deployed_source = _api_util.create_source( + name=name, api_root=self.api_root, api_key=self.api_key, workspace_id=self.workspace_id, config=source_configuration, ) - # Set the deployment Ids on the source object + # Set the deployment IDs on the source object source._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API source._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API source._deployed_source_id = deployed_source.source_id # noqa: SLF001 # Accessing nn-public API - return deployed_source.source_id + return CloudConnector( + workspace=self, + connector_id=deployed_source.source_id, + connector_type=ConnectorTypeEnum.SOURCE, + ) - def _permanently_delete_source( + def get_source( self, - source: str | Source, - ) -> None: - """Delete a source from the workspace. + source_id: str, + ) -> CloudConnector: + """Get a source by ID. - You can pass either the source ID `str` or a deployed `Source` object. + This method does not fetch data from the API. It returns a `CloudConnector` object, which + will be loaded lazily as needed. """ - if not isinstance(source, (str, Source)): - raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 + result = CloudConnector( + workspace=self, + connector_id=source_id, + connector_type=ConnectorTypeEnum.SOURCE, + ) + if result.connector_type != "source": + raise exc.PyAirbyteInputError(message="Connector is not a source.") - if isinstance(source, Source): - if not source._deployed_source_id: # noqa: SLF001 - raise ValueError("Source has not been deployed.") # noqa: TRY003 + return result - source_id = source._deployed_source_id # noqa: SLF001 + @resolve_source_id + def permanently_delete_source( + self, + source: str | CloudConnector, + ) -> None: + """Delete a source from the workspace. - elif isinstance(source, str): - source_id = source + You can pass either the source ID `str` or a `CloudConnector` object. + """ + assert isinstance(source, str), "Decorator should resolve source ID." - delete_source( - source_id=source_id, + _api_util.delete_source( + source_id=source, api_root=self.api_root, api_key=self.api_key, ) # Deploy and delete destinations - # TODO: Make this a public API - def _deploy_cache_as_destination( + def deploy_cache_as_destination( self, cache: CacheBase, + *, + name: str, ) -> str: """Deploy a cache to the workspace as a new destination. Returns the newly deployed destination ID. """ - cache_type_name = cache.__class__.__name__.replace("Cache", "") - - deployed_destination: DestinationResponse = create_destination( - name=f"Destination {cache_type_name} (Deployed by PyAirbyte)", + deployed_destination: DestinationResponse = _api_util.create_destination( + name=name, api_root=self.api_root, api_key=self.api_key, workspace_id=self.workspace_id, @@ -147,33 +326,33 @@ def _deploy_cache_as_destination( return deployed_destination.destination_id - def _permanently_delete_destination( + def get_destination( + self, + destination_id: str, + ) -> CloudConnector: + """Get a destination by ID. + + This method does not fetch data from the API. It returns a `CloudConnector` object, which + will be loaded lazily as needed. + """ + return CloudConnector( + workspace=self, + connector_id=destination_id, + connector_type=ConnectorTypeEnum.DESTINATION, + ) + + @resolve_destination_id + def permanently_delete_destination( self, *, - destination: str | None = None, - cache: CacheBase | None = None, + destination: str | CloudConnector | None = None, ) -> None: """Delete a deployed destination from the workspace. You can pass either the `Cache` class or the deployed destination ID as a `str`. """ - if destination is None and cache is None: - raise ValueError("You must provide either a destination ID or a cache object.") # noqa: TRY003 - if destination is not None and cache is not None: - raise ValueError( # noqa: TRY003 - "You must provide either a destination ID or a cache object, not both." - ) - - if cache: - if not cache._deployed_destination_id: # noqa: SLF001 - raise ValueError("Cache has not been deployed.") # noqa: TRY003 - - destination = cache._deployed_destination_id # noqa: SLF001 - - if destination is None: - raise ValueError("No destination ID provided.") # noqa: TRY003 - - delete_destination( + assert isinstance(destination, str), "Decorator should resolve destination ID." + _api_util.delete_destination( destination_id=destination, api_root=self.api_root, api_key=self.api_key, @@ -181,12 +360,14 @@ def _permanently_delete_destination( # Deploy and delete connections - # TODO: Make this a public API - def _deploy_connection( + @resolve_source_id + @resolve_destination_id + def deploy_connection( self, - source: Source | str, - cache: CacheBase | None = None, - destination: str | None = None, + name: str, + *, + source: CloudConnector | str, + destination: CloudConnector | str, table_prefix: str | None = None, selected_streams: list[str] | None = None, ) -> CloudConnection: @@ -195,51 +376,34 @@ def _deploy_connection( Returns the newly deployed connection ID as a `str`. Args: - source (Source | str): The source to deploy. You can pass either an already deployed + source: The source to deploy. You can pass either an already deployed source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object, it will be deployed automatically. - cache (CacheBase, optional): The cache to deploy as a new destination. You can provide - `cache` or `destination`, but not both. - destination (str, optional): The destination ID to use. You can provide - `cache` or `destination`, but not both. + destination (str, optional): The cache, destination, or destination ID to use. """ - # Resolve source ID - source_id: str - if isinstance(source, Source): - selected_streams = selected_streams or source.get_selected_streams() - if source._deployed_source_id: # noqa: SLF001 - source_id = source._deployed_source_id # noqa: SLF001 - else: - source_id = self._deploy_source(source) - else: - source_id = source - if not selected_streams: - raise exc.PyAirbyteInputError( - guidance="You must provide `selected_streams` when deploying a source ID." - ) - - # Resolve destination ID - destination_id: str - if destination: - destination_id = destination - elif cache: - table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "") - if not cache._deployed_destination_id: # noqa: SLF001 - destination_id = self._deploy_cache_as_destination(cache) - else: - destination_id = cache._deployed_destination_id # noqa: SLF001 - else: - raise exc.PyAirbyteInputError( - guidance="You must provide either a destination ID or a cache object." + assert isinstance(source, str), "Decorator should resolve source ID." + assert isinstance(destination, str), "Decorator should resolve destination ID." + existing_connection: CloudConnection | None = None + with suppress(exc.AirbyteMissingResourceError): + existing_connection = _api_util.fetch_connection_by_name( + connection_name=name, + workspace_id=self.workspace_id, + api_root=self.api_root, + api_key=self.api_key, + ) + if existing_connection: + raise exc.AirbyteResourceAlreadyExistsError( + message="Connection with matching name key already exists.", + context={ + "name_key": name, + "connection_id": existing_connection.connection_id, + }, ) - assert source_id is not None - assert destination_id is not None - - deployed_connection = create_connection( - name="Connection (Deployed by PyAirbyte)", - source_id=source_id, - destination_id=destination_id, + deployed_connection = _api_util.create_connection( + name=name, + source_id=source, + destination_id=destination, api_root=self.api_root, api_key=self.api_key, workspace_id=self.workspace_id, @@ -247,37 +411,55 @@ def _deploy_connection( prefix=table_prefix or "", ) - if isinstance(source, Source): - source._deployed_api_root = self.api_root # noqa: SLF001 - source._deployed_workspace_id = self.workspace_id # noqa: SLF001 - source._deployed_source_id = source_id # noqa: SLF001 - if cache: - cache._deployed_api_root = self.api_root # noqa: SLF001 - cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 - cache._deployed_destination_id = deployed_connection.destination_id # noqa: SLF001 - return CloudConnection( workspace=self, connection_id=deployed_connection.connection_id, - source=deployed_connection.source_id, - destination=deployed_connection.destination_id, + _source_id=deployed_connection.source_id, + _destination_id=deployed_connection.destination_id, ) def get_connection( self, - connection_id: str, + *, + connection_id: str | None = None, + name: str | None = None, ) -> CloudConnection: """Get a connection by ID. This method does not fetch data from the API. It returns a `CloudConnection` object, which will be loaded lazily as needed. """ + if connection_id is None and name is None: + raise exc.PyAirbyteInputError(message="No connection ID or name key provided.") + if connection_id and name: + raise exc.PyAirbyteInputError( + message="You can provide either a connection ID or a name key, but not both." + ) + + if connection_id: + return CloudConnection( + workspace=self, + connection_id=connection_id, + ) + + assert isinstance(name, str), "Name should be a string, per above validation." + + # Else derive connection ID from name key + connection = _api_util.fetch_connection_by_name( + connection_name=name, + workspace_id=self.workspace_id, + api_root=self.api_root, + api_key=self.api_key, + ) + return CloudConnection( workspace=self, - connection_id=connection_id, + connection_id=connection.connection_id, + _source_id=connection.source_id, + _destination_id=connection.destination_id, ) - def _permanently_delete_connection( + def permanently_delete_connection( self, connection: str | CloudConnection, *, @@ -294,32 +476,35 @@ def _permanently_delete_connection( connection_id=connection, ) - delete_connection( + _api_util.delete_connection( connection_id=connection.connection_id, api_root=self.api_root, api_key=self.api_key, workspace_id=self.workspace_id, ) if delete_source: - self._permanently_delete_source(source=connection.source_id) + self.permanently_delete_source(source=connection.source_id) if delete_destination: - self._permanently_delete_destination(destination=connection.destination_id) + self.permanently_delete_destination(destination=connection.destination_id) # Run syncs + @resolve_connection def run_sync( self, - connection_id: str, + connection: str | CloudConnection, *, wait: bool = True, wait_timeout: int = 300, + connection_id: str | None = None, ) -> SyncResult: - """Run a sync on a deployed connection.""" - connection = CloudConnection( - workspace=self, - connection_id=connection_id, - ) + """Run a sync on a deployed connection. + + Note: The `connection_id` parameter is deprecated. Use the `connection` parameter instead. + """ + _ = connection_id # Deprecated + assert isinstance(connection, CloudConnection), "Decorate should have resolved this." return connection.run_sync(wait=wait, wait_timeout=wait_timeout) # Get sync results and previous sync logs @@ -356,6 +541,7 @@ def get_sync_result( workspace=self, connection=connection, job_id=job_id, + table_name_prefix=connection.table_prefix, ) def get_previous_sync_logs( diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index b3b4f367..5068902d 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -59,11 +59,11 @@ class PyAirbyteError(Exception): """Base class for exceptions in Airbyte.""" + message: str | None = None guidance: str | None = None help_url: str | None = None log_text: str | list[str] | None = None context: dict[str, Any] | None = None - message: str | None = None def get_message(self) -> str: """Return the best description for the exception. @@ -383,6 +383,18 @@ class AirbyteConnectionSyncTimeoutError(AirbyteConnectionSyncError): # Airbyte Resource Errors (General) +class AirbyteResourceAlreadyExistsError(PyAirbyteError): + """Resource conflict occurred. + + This error occurs when a requested action cannot be completed because it would conflict with + another resource. For example, creating a resource with a name key that already exists or when + multiple resources are found when only one was expected. + """ + + resource_name_or_id: str | None = None + resource_type: str | None = None + + @dataclass class AirbyteMissingResourceError(AirbyteError): """Remote Airbyte resources does not exist.""" diff --git a/airbyte/secrets/util.py b/airbyte/secrets/util.py index 43afeafe..b5953219 100644 --- a/airbyte/secrets/util.py +++ b/airbyte/secrets/util.py @@ -50,7 +50,7 @@ def get_secret( sources = [sources] # type: ignore [unreachable] # This is a 'just in case' catch. # Replace any SecretSourceEnum strings with the matching SecretManager object - for source in list(sources): + for source in list(sources): # noqa: PERF101, RUF100 # New list so we can modify the original. if isinstance(source, SecretSourceEnum): if source not in available_sources: raise exc.PyAirbyteInputError( diff --git a/tests/integration_tests/cloud/conftest.py b/tests/integration_tests/cloud/conftest.py index c46af8dd..a253ba08 100644 --- a/tests/integration_tests/cloud/conftest.py +++ b/tests/integration_tests/cloud/conftest.py @@ -7,7 +7,7 @@ from pathlib import Path import sys import pytest -from airbyte._util.api_util import CLOUD_API_ROOT +from airbyte.cloud._api_util import CLOUD_API_ROOT from airbyte._executor import _get_bin_dir from airbyte.caches.base import CacheBase from airbyte.cloud import CloudWorkspace diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index d3bc4204..ca5ec6d1 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -9,7 +9,7 @@ import ulid -from airbyte._util import api_util +from airbyte.cloud import _api_util from airbyte_api.models import SourceFaker, DestinationDuckdb @@ -20,7 +20,7 @@ def test_create_and_delete_source( ) -> None: new_resource_name = "deleteme-source-faker" + str(ulid.ULID()).lower()[-6:] source_config = SourceFaker() - source = api_util.create_source( + source = _api_util.create_source( name=new_resource_name, api_root=airbyte_cloud_api_root, api_key=airbyte_cloud_api_key, @@ -31,7 +31,7 @@ def test_create_and_delete_source( assert source.source_type == "faker" assert source.source_id - api_util.delete_source( + _api_util.delete_source( source_id=source.source_id, api_root=airbyte_cloud_api_root, api_key=airbyte_cloud_api_key, @@ -51,7 +51,7 @@ def test_create_and_delete_destination( motherduck_api_key=motherduck_api_key, ) - destination = api_util.create_destination( + destination = _api_util.create_destination( name=new_resource_name, api_root=airbyte_cloud_api_root, api_key=airbyte_cloud_api_key, @@ -62,7 +62,7 @@ def test_create_and_delete_destination( assert destination.destination_type == "duckdb" assert destination.destination_id - api_util.delete_destination( + _api_util.delete_destination( destination_id=destination.destination_id, api_root=airbyte_cloud_api_root, api_key=airbyte_cloud_api_key, @@ -79,7 +79,7 @@ def test_create_and_delete_connection( new_source_name = "deleteme-source-faker" + str(ulid.ULID()).lower()[-6:] new_destination_name = "deleteme-destination-dummy" + str(ulid.ULID()).lower()[-6:] new_connection_name = "deleteme-connection-dummy" + str(ulid.ULID()).lower()[-6:] - source = api_util.create_source( + source = _api_util.create_source( name=new_source_name, api_root=airbyte_cloud_api_root, api_key=airbyte_cloud_api_key, @@ -90,7 +90,7 @@ def test_create_and_delete_connection( assert source.source_type == "faker" assert source.source_id - destination = api_util.create_destination( + destination = _api_util.create_destination( name=new_destination_name, api_root=airbyte_cloud_api_root, api_key=airbyte_cloud_api_key, @@ -104,7 +104,7 @@ def test_create_and_delete_connection( assert destination.destination_type == "duckdb" assert destination.destination_id - connection = api_util.create_connection( + connection = _api_util.create_connection( name=new_connection_name, api_root=airbyte_cloud_api_root, api_key=airbyte_cloud_api_key, @@ -118,19 +118,19 @@ def test_create_and_delete_connection( assert connection.destination_id == destination.destination_id assert connection.connection_id - api_util.delete_connection( + _api_util.delete_connection( connection_id=connection.connection_id, api_root=airbyte_cloud_api_root, api_key=airbyte_cloud_api_key, workspace_id=workspace_id, ) - api_util.delete_source( + _api_util.delete_source( source_id=source.source_id, api_root=airbyte_cloud_api_root, api_key=airbyte_cloud_api_key, workspace_id=workspace_id, ) - api_util.delete_destination( + _api_util.delete_destination( destination_id=destination.destination_id, api_root=airbyte_cloud_api_root, api_key=airbyte_cloud_api_key, diff --git a/tests/integration_tests/cloud/test_cloud_sql_reads.py b/tests/integration_tests/cloud/test_cloud_sql_reads.py index 6cb48afa..e1c2ce7a 100644 --- a/tests/integration_tests/cloud/test_cloud_sql_reads.py +++ b/tests/integration_tests/cloud/test_cloud_sql_reads.py @@ -5,13 +5,12 @@ from contextlib import suppress +import airbyte as ab import pandas as pd import pytest -from sqlalchemy.engine.base import Engine - -import airbyte as ab from airbyte import cloud from airbyte.cloud.sync_results import SyncResult +from sqlalchemy.engine.base import Engine @pytest.fixture @@ -38,13 +37,18 @@ def test_deploy_and_run_and_read( """Test reading from a cache.""" # Deploy source, destination, and connection: - source_id = cloud_workspace._deploy_source(source=deployable_source) - destination_id = cloud_workspace._deploy_cache_as_destination( - cache=new_deployable_cache - ) - connection: cloud.CloudConnection = cloud_workspace._deploy_connection( + source_id = cloud_workspace.deploy_source( source=deployable_source, + name="IntegTest Source (DELETEME)", + ) + destination_id = cloud_workspace.deploy_cache_as_destination( cache=new_deployable_cache, + name="IntegTest Cache-as-Destination (DELETEME)", + ) + connection: cloud.CloudConnection = cloud_workspace.deploy_connection( + name="IntegTest Connection (DELETEME)", + source=source_id, + destination_id=destination_id, table_prefix=new_deployable_cache.table_prefix, selected_streams=deployable_source.get_selected_streams(), ) @@ -67,15 +71,15 @@ def test_deploy_and_run_and_read( # Cleanup with suppress(Exception): - cloud_workspace._permanently_delete_connection( - connection_id=connection, + cloud_workspace.permanently_delete_connection( + connection_id=connection.connection_id, delete_source=True, delete_destination=True, ) with suppress(Exception): - cloud_workspace._permanently_delete_source(source_id=source_id) + cloud_workspace.permanently_delete_source(source_id=source_id) with suppress(Exception): - cloud_workspace._permanently_delete_destination(destination_id=destination_id) + cloud_workspace.permanently_delete_destination(destination_id=destination_id) @pytest.mark.parametrize( diff --git a/tests/integration_tests/cloud/test_cloud_sync.py b/tests/integration_tests/cloud/test_cloud_sync.py index 362aeb07..f7919d19 100644 --- a/tests/integration_tests/cloud/test_cloud_sync.py +++ b/tests/integration_tests/cloud/test_cloud_sync.py @@ -6,9 +6,8 @@ from __future__ import annotations -import pytest - import airbyte as ab +import pytest from airbyte.caches import MotherDuckCache from airbyte.cloud import CloudWorkspace from airbyte.cloud.sync_results import SyncResult @@ -67,7 +66,11 @@ def test_deploy_and_run_connection( schema_name="public", ) - connection_id: str = cloud_workspace._deploy_connection(source=source, cache=cache) + connection_id: str = cloud_workspace.deploy_connection( + name="My Faker Source (DELETEME)", + source=source, + cache=cache, + ) sync_result = cloud_workspace.run_sync(connection_id=connection_id) _ = sync_result @@ -75,4 +78,4 @@ def test_deploy_and_run_connection( assert cache.stream_names assert cache.streams["users"].to_pandas() - cloud_workspace._permanently_delete_connection(connection_id=connection_id) + cloud_workspace.permanently_delete_connection(connection_id=connection_id) diff --git a/tests/integration_tests/cloud/test_cloud_workspaces.py b/tests/integration_tests/cloud/test_cloud_workspaces.py index 98272419..25334e0d 100644 --- a/tests/integration_tests/cloud/test_cloud_workspaces.py +++ b/tests/integration_tests/cloud/test_cloud_workspaces.py @@ -6,26 +6,74 @@ from __future__ import annotations +from dataclasses import asdict + import airbyte as ab +from airbyte import cloud +from airbyte import exceptions as exc from airbyte.caches import MotherDuckCache from airbyte.cloud import CloudWorkspace from airbyte.cloud.connections import CloudConnection +from pytest import raises def test_deploy_source( cloud_workspace: CloudWorkspace, ) -> None: """Test deploying a source to a workspace.""" - source = ab.get_source( + local_source: ab.Source = ab.get_source( "source-faker", local_executable="source-faker", - config={"count": 100}, + config={"count": 100, "seed": 123}, install_if_missing=False, ) - source.check() - source_id: str = cloud_workspace._deploy_source(source) + local_source.check() + + # Deploy source: + source_connector: cloud.CloudConnector = cloud_workspace.deploy_source( + source=local_source, + name="My Faker Source (DELETEME)", # Used in deduplication and idempotency + ) + assert source_connector.name == "My Faker Source (DELETEME)" + assert "count" in list(asdict(source_connector.configuration).keys()) + assert asdict(source_connector.configuration)["count"] == 100 + assert asdict(source_connector.configuration)["seed"] == 123 - cloud_workspace._permanently_delete_source(source=source_id) + with raises(exc.PyAirbyteResourceConflictError): + # Deploy source again (should fail): + cloud_workspace.deploy_source( + source=local_source, + name_key="My Faker Source", # Used in deduplication and idempotency + update_existing=False, # Fail since source already exists + ) + + # Change config and deploy source again (should succeed): + local_source.set_config({"count": 200}) + source_connector: cloud.CloudConnector = cloud_workspace.deploy_source( + source=local_source, + name_key="My Faker Source", # Used in deduplication and idempotency + update_existing=True, # Update existing source + ) + + # Partially update the configuration (merging with config from previous deployment): + source_connector.update_configuration( + {"count": 300}, + merge=True, + ) + assert source_connector.configuration["count"] == 300 + assert source_connector.configuration["seed"] == 123 + + # Fully replace the configuration: + source_connector.update_configuration( + {"count": 300}, + merge=False, + ) + assert "count" in list(asdict(source_connector.configuration).keys()) + assert asdict(source_connector.configuration)["count"] == 300 + assert "seed" not in source_connector.configuration + + # Delete the deployed source connector: + source_connector.permanently_delete_connector() def test_deploy_cache_as_destination( @@ -38,8 +86,11 @@ def test_deploy_cache_as_destination( database="temp", schema_name="public", ) - destination_id: str = cloud_workspace._deploy_cache_as_destination(cache=cache) - cloud_workspace._permanently_delete_destination(destination=destination_id) + destination_id: str = cloud_workspace.deploy_cache_as_destination( + cache=cache, + name="My MotherDuck Destination (DELETEME)", # Used in deduplication and idempotency + ) + cloud_workspace.permanently_delete_destination(destination=destination_id) def test_deploy_connection( @@ -62,15 +113,25 @@ def test_deploy_connection( table_prefix="abc_deleteme_", # table_suffix="", # Suffix not supported in CloudConnection ) - - connection: CloudConnection = cloud_workspace._deploy_connection( + source_connector: cloud.CloudConnector = cloud_workspace.deploy_source( source=source, + name="My Faker Source (DELETEME)", # Used in deduplication and idempotency + ) + destination_connector: cloud.CloudConnector = cloud_workspace.deploy_cache_as_destination( cache=cache, + name="My MotherDuck Destination (DELETEME)", # Used in deduplication and idempotency + ) + + connection: CloudConnection = cloud_workspace.deploy_connection( + name="My Connection (DELETEME)", # Used in deduplication and idempotency + source=source_connector, + destination=destination_connector, + table_prefix=cache.table_prefix, ) assert set(connection.stream_names) == set(["users", "products", "purchases"]) assert connection.table_prefix == "abc_deleteme_" # assert connection.table_suffix == "" # Suffix not supported in CloudConnection - cloud_workspace._permanently_delete_connection( + cloud_workspace.permanently_delete_connection( connection=connection, delete_source=True, delete_destination=True,