From abbb25634039d6d0b931d35fe3976cfeba444f20 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 09:59:55 -0800 Subject: [PATCH 01/28] new exception type: AirbyteConnectorNotRegisteredError --- .../_factories/connector_factories.py | 42 +++++++++++++------ airbyte-lib/airbyte_lib/exceptions.py | 10 ++++- airbyte-lib/airbyte_lib/registry.py | 8 +--- .../tests/integration_tests/test_install.py | 18 ++++++++ 4 files changed, 59 insertions(+), 19 deletions(-) create mode 100644 airbyte-lib/tests/integration_tests/test_install.py diff --git a/airbyte-lib/airbyte_lib/_factories/connector_factories.py b/airbyte-lib/airbyte_lib/_factories/connector_factories.py index 4dbe8c6f41f0..49fa139c154c 100644 --- a/airbyte-lib/airbyte_lib/_factories/connector_factories.py +++ b/airbyte-lib/airbyte_lib/_factories/connector_factories.py @@ -3,9 +3,9 @@ from typing import Any +from airbyte_lib import exceptions as exc from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor -from airbyte_lib.exceptions import AirbyteLibInputError -from airbyte_lib.registry import get_connector_metadata +from airbyte_lib.registry import ConnectorMetadata, get_connector_metadata from airbyte_lib.source import Source @@ -35,28 +35,46 @@ def get_connector( install_if_missing: whether to install the connector if it is not available locally. This parameter is ignored if use_local_install is True. """ - metadata = get_connector_metadata(name) + if use_local_install and pip_url: + raise exc.AirbyteLibInputError( + message="Param 'pip_url' is not supported when 'use_local_install' is True." + ) + + if use_local_install and version: + raise exc.AirbyteLibInputError( + message="Param 'version' is not supported when 'use_local_install' is True." + ) + + if use_local_install and install_if_missing: + raise exc.AirbyteLibInputError( + message="Param 'install_if_missing' is not supported when 'use_local_install' is True." + ) + + metadata: ConnectorMetadata | None = None + try: + metadata = get_connector_metadata(name) + except exc.AirbyteConnectorNotRegisteredError: + if not pip_url: + raise + if use_local_install: - if pip_url: - raise AirbyteLibInputError( - message="Param 'pip_url' is not supported when 'use_local_install' is True." - ) - if version: - raise AirbyteLibInputError( - message="Param 'version' is not supported when 'use_local_install' is True." - ) executor: Executor = PathExecutor( - metadata=metadata, + name=name, target_version=version, ) else: executor = VenvExecutor( + name=name, metadata=metadata, target_version=version, install_if_missing=install_if_missing, pip_url=pip_url, ) + + if install_if_missing: + executor.ensure_installation() + return Source( executor=executor, name=name, diff --git a/airbyte-lib/airbyte_lib/exceptions.py b/airbyte-lib/airbyte_lib/exceptions.py index 3c6336d03103..5776708686a8 100644 --- a/airbyte-lib/airbyte_lib/exceptions.py +++ b/airbyte-lib/airbyte_lib/exceptions.py @@ -174,6 +174,13 @@ class AirbyteConnectorRegistryError(AirbyteError): """Error when accessing the connector registry.""" +@dataclass +class AirbyteConnectorNotRegisteredError(AirbyteConnectorRegistryError): + """Connector not found in registry.""" + + connector_name: str | None = None + guidance = "Please double check the connector name." + # Connector Errors @@ -185,7 +192,8 @@ class AirbyteConnectorError(AirbyteError): class AirbyteConnectorNotFoundError(AirbyteConnectorError): - """Connector not found.""" + """Connector name not found in registry.""" + class AirbyteConnectorInstallationError(AirbyteConnectorError): diff --git a/airbyte-lib/airbyte_lib/registry.py b/airbyte-lib/airbyte_lib/registry.py index bd030a867ff0..e3e12796c9d1 100644 --- a/airbyte-lib/airbyte_lib/registry.py +++ b/airbyte-lib/airbyte_lib/registry.py @@ -48,11 +48,7 @@ def get_connector_metadata(name: str) -> ConnectorMetadata: if not _cache: _update_cache() if not _cache or name not in _cache: - raise exc.AirbyteLibInputError( - message="Connector name not found in registry.", - guidance="Please double check the connector name.", - context={ - "connector_name": name, - }, + raise exc.AirbyteConnectorNotRegisteredError( + connector_name=name, ) return _cache[name] diff --git a/airbyte-lib/tests/integration_tests/test_install.py b/airbyte-lib/tests/integration_tests/test_install.py new file mode 100644 index 000000000000..694b186fe3e9 --- /dev/null +++ b/airbyte-lib/tests/integration_tests/test_install.py @@ -0,0 +1,18 @@ +from gettext import install +import pytest + +from airbyte_lib._factories.connector_factories import get_connector +from airbyte_lib import exceptions as exc + + +def test_install_failure_log_pypi(): + """Test that the install log is created and contains the expected content.""" + with pytest.raises(exc.AirbyteConnectorNotRegisteredError): + source = get_connector("source-not-found") + + with pytest.raises(exc.AirbyteConnectorInstallationError): + source = get_connector( + "source-not-found", + pip_url="https://pypi.org/project/airbyte-not-found", + install_if_missing=True, + ) From 3845f5c5586238da8462fb4dc1c372f256cacee2 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 10:00:59 -0800 Subject: [PATCH 02/28] make constructors more resilient --- airbyte-lib/airbyte_lib/_executor.py | 71 +++++++++++++--------------- airbyte-lib/airbyte_lib/source.py | 26 ++++++++-- 2 files changed, 57 insertions(+), 40 deletions(-) diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index 20899f892006..06afce3fb5e2 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -24,15 +24,25 @@ class Executor(ABC): def __init__( self, - metadata: ConnectorMetadata, + *, + name: str | None = None, + metadata: ConnectorMetadata | None = None, target_version: str | None = None, ) -> None: - self.metadata = metadata - self.enforce_version = target_version is not None - if target_version is None or target_version == _LATEST_VERSION: - self.target_version = metadata.latest_available_version - else: + if name is None and metadata is None: + raise exc.AirbyteLibInternalError( + message="Either name or metadata must be provided." + ) + + self.name: str = name or metadata.name + self.metadata: ConnectorMetadata | None = metadata + self.enforce_version: bool = target_version is not None + + self.target_version: str | None = None + if target_version is not None: self.target_version = target_version + elif metadata and (target_version is None or target_version == _LATEST_VERSION): + self.target_version = metadata.latest_available_version @abstractmethod def execute(self, args: list[str]) -> Iterator[str]: @@ -107,25 +117,24 @@ def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]: class VenvExecutor(Executor): def __init__( self, - metadata: ConnectorMetadata, + name: str | None = None, + *, + metadata: ConnectorMetadata | None = None, target_version: str | None = None, pip_url: str | None = None, - *, - install_if_missing: bool = False, ) -> None: - super().__init__(metadata, target_version) - self.install_if_missing = install_if_missing + super().__init__(name=name, metadata=metadata, target_version=target_version) # This is a temporary install path that will be replaced with a proper package # name once they are published. - # TODO: Replace with `f"airbyte-{self.metadata.name}"` - self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.metadata.name}" + # TODO: Replace with `f"airbyte-{self.name}"` + self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.name}" def _get_venv_name(self) -> str: - return f".venv-{self.metadata.name}" + return f".venv-{self.name}" def _get_connector_path(self) -> Path: - return Path(self._get_venv_name(), "bin", self.metadata.name) + return Path(self._get_venv_name(), "bin", self.name) def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None: result = subprocess.run(args, check=False) @@ -154,7 +163,7 @@ def _get_installed_version(self) -> str: > python -c "from importlib.metadata import version; print(version(''))" """ venv_name = self._get_venv_name() - connector_name = self.metadata.name + connector_name = self.name return subprocess.check_output( [ Path(venv_name) / "bin" / "python", @@ -176,27 +185,15 @@ def ensure_installation( Note: Version verification is not supported for connectors installed from a local path. """ - venv_name = f".venv-{self.metadata.name}" + venv_name = f".venv-{self.name}" venv_path = Path(venv_name) if not venv_path.exists(): - if not self.install_if_missing: - raise exc.AirbyteConnectorNotFoundError( - message="Connector not available and venv does not exist.", - guidance=( - "Please ensure the connector is pre-installed or consider enabling " - "`install_if_missing=True`." - ), - context={ - "connector_name": self.metadata.name, - "venv_name": venv_name, - }, - ) self.install() connector_path = self._get_connector_path() if not connector_path.exists(): raise exc.AirbyteConnectorNotFoundError( - connector_name=self.metadata.name, + connector_name=self.name, context={ "venv_name": venv_name, }, @@ -212,7 +209,7 @@ def ensure_installation( version_after_install = self._get_installed_version() if version_after_install != self.target_version: raise exc.AirbyteConnectorInstallationError( - connector_name=self.metadata.name, + connector_name=self.name, context={ "venv_name": venv_name, "target_version": self.target_version, @@ -228,7 +225,7 @@ def execute(self, args: list[str]) -> Iterator[str]: yield from stream def get_telemetry_info(self) -> SourceTelemetryInfo: - return SourceTelemetryInfo(self.metadata.name, SourceType.VENV, self.target_version) + return SourceTelemetryInfo(self.name, SourceType.VENV, self.target_version) class PathExecutor(Executor): @@ -237,24 +234,24 @@ def ensure_installation(self) -> None: self.execute(["spec"]) except Exception as e: raise exc.AirbyteConnectorNotFoundError( - connector_name=self.metadata.name, + connector_name=self.name, ) from e def install(self) -> NoReturn: raise exc.AirbyteConnectorInstallationError( message="Connector cannot be installed because it is not managed by airbyte-lib.", - connector_name=self.metadata.name, + connector_name=self.name, ) def uninstall(self) -> NoReturn: raise exc.AirbyteConnectorInstallationError( message="Connector cannot be uninstalled because it is not managed by airbyte-lib.", - connector_name=self.metadata.name, + connector_name=self.name, ) def execute(self, args: list[str]) -> Iterator[str]: - with _stream_from_subprocess([self.metadata.name, *args]) as stream: + with _stream_from_subprocess([self.name, *args]) as stream: yield from stream def get_telemetry_info(self) -> SourceTelemetryInfo: - return SourceTelemetryInfo(self.metadata.name, SourceType.LOCAL_INSTALL, version=None) + return SourceTelemetryInfo(self.name, SourceType.LOCAL_INSTALL, version=None) diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index 91ed050faad1..b91dc1cf0b79 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -68,7 +68,13 @@ def __init__( name: str, config: dict[str, Any] | None = None, streams: list[str] | None = None, + *, + validate: bool = False, ) -> None: + """Initialize the source. + + If config is provided, it will be validated against the spec if validate is True. + """ self._processed_records = 0 self.executor = executor self.name = name @@ -79,7 +85,7 @@ def __init__( self._spec: ConnectorSpecification | None = None self._selected_stream_names: list[str] | None = None if config is not None: - self.set_config(config) + self.set_config(config, validate=validate) if streams is not None: self.set_streams(streams) @@ -102,10 +108,24 @@ def set_streams(self, streams: list[str]) -> None: ) self._selected_stream_names = streams - def set_config(self, config: dict[str, Any]) -> None: - self._validate_config(config) + def set_config( + self, + config: dict[str, Any], + *, + validate: bool = False, + ) -> None: + """Set the config for the connector. + + If validate is True, raise an exception if the config fails validation. + + If validate is False, validation will be deferred until check() is called. + """ + if validate: + self._validate_config(config) + self._config_dict = config + @property def _config(self) -> dict[str, Any]: if self._config_dict is None: From 9fccace4ef653d09b310de7330c2d72b6c675923 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 10:27:02 -0800 Subject: [PATCH 03/28] print stderr in exception text, cleanup failed install, remove editable flag during install --- airbyte-lib/airbyte_lib/_executor.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index 06afce3fb5e2..d8290ba78102 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -4,7 +4,7 @@ import subprocess import sys from abc import ABC, abstractmethod -from contextlib import contextmanager +from contextlib import contextmanager, suppress from pathlib import Path from typing import IO, TYPE_CHECKING, Any, NoReturn @@ -137,10 +137,16 @@ def _get_connector_path(self) -> Path: return Path(self._get_venv_name(), "bin", self.name) def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None: - result = subprocess.run(args, check=False) + result = subprocess.run( + args, + check=False, + stderr=subprocess.PIPE, + ) if result.returncode != 0: - raise exc.AirbyteConnectorInstallationError from exc.AirbyteSubprocessFailedError( - exit_code=result.returncode + raise exc.AirbyteSubprocessFailedError( + run_args=args, + exit_code=result.returncode, + log_text=result.stderr.decode("utf-8"), ) def uninstall(self) -> None: @@ -154,7 +160,18 @@ def install(self) -> None: pip_path = str(Path(venv_name) / "bin" / "pip") - self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", self.pip_url]) + try: + self._run_subprocess_and_raise_on_failure( + args=[pip_path, "install", *self.pip_url.split(" ")] + ) + except exc.AirbyteSubprocessFailedError as ex: + # If the installation failed, remove the virtual environment + # Otherwise, the connector will be considered as installed and the user may not be able + # to retry the installation. + with suppress(exc.AirbyteSubprocessFailedError): + self.uninstall() + + raise exc.AirbyteConnectorInstallationError from ex def _get_installed_version(self) -> str: """Detect the version of the connector installed. From a217a6ee463aac027393e90c12653943264c75b8 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 10:27:39 -0800 Subject: [PATCH 04/28] move auto-install out of venv constructor, for easier debugging --- airbyte-lib/airbyte_lib/_factories/connector_factories.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-lib/airbyte_lib/_factories/connector_factories.py b/airbyte-lib/airbyte_lib/_factories/connector_factories.py index 49fa139c154c..230f386f1159 100644 --- a/airbyte-lib/airbyte_lib/_factories/connector_factories.py +++ b/airbyte-lib/airbyte_lib/_factories/connector_factories.py @@ -68,12 +68,11 @@ def get_connector( name=name, metadata=metadata, target_version=version, - install_if_missing=install_if_missing, pip_url=pip_url, ) if install_if_missing: - executor.ensure_installation() + executor.install() return Source( executor=executor, From 6aa85d694c10d995aec69839161df97f3644bb43 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 10:28:03 -0800 Subject: [PATCH 05/28] add test to assert that install failure includes pip log text --- airbyte-lib/tests/integration_tests/test_install.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte-lib/tests/integration_tests/test_install.py b/airbyte-lib/tests/integration_tests/test_install.py index 694b186fe3e9..b5e822320503 100644 --- a/airbyte-lib/tests/integration_tests/test_install.py +++ b/airbyte-lib/tests/integration_tests/test_install.py @@ -10,9 +10,12 @@ def test_install_failure_log_pypi(): with pytest.raises(exc.AirbyteConnectorNotRegisteredError): source = get_connector("source-not-found") - with pytest.raises(exc.AirbyteConnectorInstallationError): + with pytest.raises(exc.AirbyteConnectorInstallationError) as exc_info: source = get_connector( "source-not-found", pip_url="https://pypi.org/project/airbyte-not-found", install_if_missing=True, ) + + # Check that the stderr log contains the expected content from a failed pip install + assert 'Could not install requirement' in str(exc_info.value.__cause__.log_text) From dddbc78ab1137a06818ef623b803110e1b37b017 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 10:28:23 -0800 Subject: [PATCH 06/28] update docs --- airbyte-lib/docs/generated/airbyte_lib.html | 18 ++++++++++++++---- .../docs/generated/airbyte_lib/cloud.html | 7 +++++++ 2 files changed, 21 insertions(+), 4 deletions(-) create mode 100644 airbyte-lib/docs/generated/airbyte_lib/cloud.html diff --git a/airbyte-lib/docs/generated/airbyte_lib.html b/airbyte-lib/docs/generated/airbyte_lib.html index 91c67884b510..d29e527d192b 100644 --- a/airbyte-lib/docs/generated/airbyte_lib.html +++ b/airbyte-lib/docs/generated/airbyte_lib.html @@ -409,13 +409,17 @@
Inherited Members
- Source( executor: airbyte_lib._executor.Executor, name: str, config: dict[str, typing.Any] | None = None, streams: list[str] | None = None) + Source( executor: airbyte_lib._executor.Executor, name: str, config: dict[str, typing.Any] | None = None, streams: list[str] | None = None, *, validate: bool = False)
- +

Initialize the source.

+ +

If config is provided, it will be validated against the spec if validate is True.

+
+
@@ -465,13 +469,19 @@
Inherited Members
def - set_config(self, config: dict[str, typing.Any]) -> None: + set_config(self, config: dict[str, typing.Any], *, validate: bool = False) -> None:
- +

Set the config for the connector.

+ +

If validate is True, raise an exception if the config fails validation.

+ +

If validate is False, validation will be deferred until check() is called.

+
+
diff --git a/airbyte-lib/docs/generated/airbyte_lib/cloud.html b/airbyte-lib/docs/generated/airbyte_lib/cloud.html new file mode 100644 index 000000000000..c0d27ca14eaa --- /dev/null +++ b/airbyte-lib/docs/generated/airbyte_lib/cloud.html @@ -0,0 +1,7 @@ + +
+
+ + + + \ No newline at end of file From b1d966b6bf12bd86569510fab5b37c439baa6449 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 11:24:36 -0800 Subject: [PATCH 07/28] auto-format --- airbyte-lib/airbyte_lib/_executor.py | 4 +--- airbyte-lib/airbyte_lib/exceptions.py | 2 +- airbyte-lib/airbyte_lib/source.py | 1 - 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index d8290ba78102..aff468cb39bc 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -30,9 +30,7 @@ def __init__( target_version: str | None = None, ) -> None: if name is None and metadata is None: - raise exc.AirbyteLibInternalError( - message="Either name or metadata must be provided." - ) + raise exc.AirbyteLibInternalError(message="Either name or metadata must be provided.") self.name: str = name or metadata.name self.metadata: ConnectorMetadata | None = metadata diff --git a/airbyte-lib/airbyte_lib/exceptions.py b/airbyte-lib/airbyte_lib/exceptions.py index 5776708686a8..f270069bf42e 100644 --- a/airbyte-lib/airbyte_lib/exceptions.py +++ b/airbyte-lib/airbyte_lib/exceptions.py @@ -181,6 +181,7 @@ class AirbyteConnectorNotRegisteredError(AirbyteConnectorRegistryError): connector_name: str | None = None guidance = "Please double check the connector name." + # Connector Errors @@ -195,7 +196,6 @@ class AirbyteConnectorNotFoundError(AirbyteConnectorError): """Connector name not found in registry.""" - class AirbyteConnectorInstallationError(AirbyteConnectorError): """Error when installing the connector.""" diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index b91dc1cf0b79..50f627f56a77 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -125,7 +125,6 @@ def set_config( self._config_dict = config - @property def _config(self) -> dict[str, Any]: if self._config_dict is None: From f61152a33aa40e0ba96ae89bb37a1ba7a0a2f2e1 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 14:33:29 -0800 Subject: [PATCH 08/28] update docs --- airbyte-lib/docs/generated/airbyte_lib.html | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte-lib/docs/generated/airbyte_lib.html b/airbyte-lib/docs/generated/airbyte_lib.html index d29e527d192b..dae64a375446 100644 --- a/airbyte-lib/docs/generated/airbyte_lib.html +++ b/airbyte-lib/docs/generated/airbyte_lib.html @@ -254,7 +254,7 @@
Inherited Members
def - get_connector( name: str, version: str | None = None, pip_url: str | None = None, config: dict[str, typing.Any] | None = None, *, use_local_install: bool = False, install_if_missing: bool = True) -> Source: + get_connector( name: str, version: str | None = None, pip_url: str | None = None, config: dict[str, typing.Any] | None = None, *, local_executable: pathlib.Path | str | None = None, install_if_missing: bool = True) -> Source:
@@ -271,11 +271,11 @@
Inherited Members
connector name. config: connector config - if not provided, you need to set it later via the set_config method. - use_local_install: whether to use a virtual environment to run the connector. If True, the - connector is expected to be available on the path (e.g. installed via pip). If False, - the connector will be installed automatically in a virtual environment. - install_if_missing: whether to install the connector if it is not available locally. This - parameter is ignored if use_local_install is True.

+ local_executable: If set, the connector will be assumed to already be installed and will be + executed using this path or executable name. Otherwise, the connector will be installed + automatically in a virtual environment. + install_if_missing: Whether to install the connector if it is not available locally. This + parameter is ignored when local_executable is set.

From d6650883d2891b8fb9cc643c84e6b98c15cc7351 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 14:37:06 -0800 Subject: [PATCH 09/28] refactor version handling, control for side effects --- airbyte-lib/airbyte_lib/_executor.py | 218 ++++++++++++++---- .../_factories/connector_factories.py | 75 +++--- airbyte-lib/airbyte_lib/source.py | 32 ++- airbyte-lib/airbyte_lib/validate.py | 10 +- .../source-test/source_test/__init__.py | 0 .../integration_tests/test_integration.py | 86 +++++-- 6 files changed, 310 insertions(+), 111 deletions(-) create mode 100644 airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/__init__.py diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index aff468cb39bc..1379fa0bfe9c 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -6,16 +6,17 @@ from abc import ABC, abstractmethod from contextlib import contextmanager, suppress from pathlib import Path -from typing import IO, TYPE_CHECKING, Any, NoReturn +from shutil import rmtree +from typing import IO, TYPE_CHECKING, Any, NoReturn, cast from airbyte_lib import exceptions as exc +from airbyte_lib.registry import ConnectorMetadata from airbyte_lib.telemetry import SourceTelemetryInfo, SourceType if TYPE_CHECKING: from collections.abc import Generator, Iterable, Iterator - from airbyte_lib.registry import ConnectorMetadata _LATEST_VERSION = "latest" @@ -29,25 +30,28 @@ def __init__( metadata: ConnectorMetadata | None = None, target_version: str | None = None, ) -> None: - if name is None and metadata is None: + if not name and not metadata: raise exc.AirbyteLibInternalError(message="Either name or metadata must be provided.") - self.name: str = name or metadata.name + self.name: str = name or cast(ConnectorMetadata, metadata).name # metadata is not None here self.metadata: ConnectorMetadata | None = metadata self.enforce_version: bool = target_version is not None + self.reported_version: str | None = None self.target_version: str | None = None - if target_version is not None: - self.target_version = target_version - elif metadata and (target_version is None or target_version == _LATEST_VERSION): - self.target_version = metadata.latest_available_version + if target_version: + if metadata and target_version == _LATEST_VERSION: + self.target_version = metadata.latest_available_version + else: + self.target_version = target_version @abstractmethod def execute(self, args: list[str]) -> Iterator[str]: pass @abstractmethod - def ensure_installation(self) -> None: + def ensure_installation(self, *, auto_fix: bool = True) -> None: + _ = auto_fix pass @abstractmethod @@ -109,7 +113,10 @@ def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]: # If the exit code is not 0 or -15 (SIGTERM), raise an exception if exit_code not in (0, -15): - raise exc.AirbyteSubprocessFailedError(exit_code=exit_code) + raise exc.AirbyteSubprocessFailedError( + run_args=args, + exit_code=exit_code, + ) class VenvExecutor(Executor): @@ -120,19 +127,34 @@ def __init__( metadata: ConnectorMetadata | None = None, target_version: str | None = None, pip_url: str | None = None, + install_root: Path | None = None, ) -> None: + """Initialize a connector executor that runs a connector in a virtual environment. + + Args: + name: The name of the connector. + metadata: (Optional.) The metadata of the connector. + target_version: (Optional.) The version of the connector to install. + pip_url: (Optional.) The pip URL of the connector to install. + install_root: (Optional.) The root directory where the virtual environment will be + created. If not provided, the current working directory will be used. + """ super().__init__(name=name, metadata=metadata, target_version=target_version) # This is a temporary install path that will be replaced with a proper package # name once they are published. # TODO: Replace with `f"airbyte-{self.name}"` self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.name}" + self.install_root = install_root or Path.cwd() def _get_venv_name(self) -> str: return f".venv-{self.name}" + def _get_venv_path(self) -> Path: + return self.install_root / self._get_venv_name() + def _get_connector_path(self) -> Path: - return Path(self._get_venv_name(), "bin", self.name) + return self._get_venv_path() / "bin" / self.name def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None: result = subprocess.run( @@ -148,15 +170,21 @@ def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None: ) def uninstall(self) -> None: - venv_name = self._get_venv_name() - if Path(venv_name).exists(): - self._run_subprocess_and_raise_on_failure(["rm", "-rf", venv_name]) + if self._get_venv_path().exists(): + rmtree(str(self._get_venv_path())) + + self.reported_version = None # Reset the reported version from the previous installation def install(self) -> None: - venv_name = self._get_venv_name() - self._run_subprocess_and_raise_on_failure([sys.executable, "-m", "venv", venv_name]) + """Install the connector in a virtual environment. - pip_path = str(Path(venv_name) / "bin" / "pip") + After installation, the installed version will be stored in self.reported_version. + """ + self._run_subprocess_and_raise_on_failure( + [sys.executable, "-m", "venv", str(self._get_venv_path())] + ) + + pip_path = str(self._get_venv_path() / "bin" / "pip") try: self._run_subprocess_and_raise_on_failure( @@ -171,25 +199,51 @@ def install(self) -> None: raise exc.AirbyteConnectorInstallationError from ex - def _get_installed_version(self) -> str: + # Assuming the installation succeeded, store the installed version + self.reported_version = self._get_installed_version(raise_on_error=False, recheck=True) + + + def _get_installed_version( + self, + *, + raise_on_error: bool = False, + recheck: bool = False, + ) -> str | None: """Detect the version of the connector installed. + Returns the version string if it can be detected, otherwise None. + + If raise_on_error is True, raise an exception if the version cannot be detected. + + If recheck if False and the version has already been detected, return the cached value. + In the venv, we run the following: > python -c "from importlib.metadata import version; print(version(''))" """ - venv_name = self._get_venv_name() + if not recheck and self.reported_version: + return self.reported_version + connector_name = self.name - return subprocess.check_output( - [ - Path(venv_name) / "bin" / "python", - "-c", - f"from importlib.metadata import version; print(version('{connector_name}'))", - ], - universal_newlines=True, - ).strip() + + try: + return subprocess.check_output( + [ + self._get_venv_path() / "bin" / "python", + "-c", + f"from importlib.metadata import version; print(version('{connector_name}'))", + ], + universal_newlines=True, + ).strip() + except subprocess.CalledProcessError: + if not raise_on_error: + raise + + return None def ensure_installation( self, + *, + auto_fix: bool = True, ) -> None: """Ensure that the connector is installed in a virtual environment. @@ -200,36 +254,77 @@ def ensure_installation( Note: Version verification is not supported for connectors installed from a local path. """ + # Store the installed version (or None if not installed) + if not self.reported_version: + self.reported_version = self._get_installed_version() + + original_installed_version = self.reported_version + + reinstalled = False venv_name = f".venv-{self.name}" - venv_path = Path(venv_name) - if not venv_path.exists(): + if not self._get_venv_path().exists(): + if not auto_fix: + raise exc.AirbyteConnectorInstallationError( + message="Virtual environment does not exist.", + connector_name=self.name, + context={ + "venv_path": self._get_venv_path(), + }, + ) + + # If the venv path does not exist, install. self.install() + reinstalled = True + + elif not self._get_connector_path().exists(): + if not auto_fix: + raise exc.AirbyteConnectorInstallationError( + message="Could not locate connector executable within the virtual environment.", + connector_name=self.name, + context={ + "connector_path": self._get_connector_path(), + }, + ) + + # If the connector path does not exist, uninstall and re-install. + # This is sometimes caused by a failed or partial installation. + self.uninstall() + self.install() + reinstalled = True + + # By now, everything should be installed. Raise an exception if not. connector_path = self._get_connector_path() if not connector_path.exists(): - raise exc.AirbyteConnectorNotFoundError( + raise exc.AirbyteConnectorInstallationError( + message="Connector's executable could not be found within the virtual environment.", connector_name=self.name, context={ - "venv_name": venv_name, + "connector_path": self._get_connector_path(), }, ) from FileNotFoundError(connector_path) if self.enforce_version: - installed_version = self._get_installed_version() - if installed_version != self.target_version: - # If the version doesn't match, reinstall - self.install() + version_after_reinstall: str | None = None + if self.reported_version != self.target_version: + if auto_fix and not reinstalled: + # If we haven't already reinstalled above, reinstall now. + self.install() + reinstalled = True + + if reinstalled: + version_after_reinstall = self.reported_version # Check the version again - version_after_install = self._get_installed_version() - if version_after_install != self.target_version: + if self.reported_version != self.target_version: raise exc.AirbyteConnectorInstallationError( + message="Connector's reported version does not match the target version.", connector_name=self.name, context={ "venv_name": venv_name, "target_version": self.target_version, - "installed_version": installed_version, - "version_after_install": version_after_install, + "original_installed_version": original_installed_version, + "version_after_reinstall": version_after_reinstall, }, ) @@ -240,14 +335,47 @@ def execute(self, args: list[str]) -> Iterator[str]: yield from stream def get_telemetry_info(self) -> SourceTelemetryInfo: - return SourceTelemetryInfo(self.name, SourceType.VENV, self.target_version) + return SourceTelemetryInfo( + name=self.name, + type=SourceType.VENV, + version=self.reported_version, + ) class PathExecutor(Executor): - def ensure_installation(self) -> None: + + def __init__( + self, + name: str | None = None, + *, + path: Path, + target_version: str | None = None, + ) -> None: + """Initialize a connector executor that runs a connector from a local path. + + If path is simply the name of the connector, it will be expected to exist in the current + PATH or in the current working directory. + """ + self.path: Path = path + name = name or path.name + super().__init__(name=name, target_version=target_version) + + def ensure_installation( + self, + *, + auto_fix: bool = True, + ) -> None: + """Ensure that the connector executable can be found. + + The auto_fix parameter is ignored for this executor type. + """ + _ = auto_fix try: self.execute(["spec"]) except Exception as e: + # TODO: Improve error handling. We should try to distinguish between + # a connector that is not installed and a connector that is not + # working properly. raise exc.AirbyteConnectorNotFoundError( connector_name=self.name, ) from e @@ -265,8 +393,12 @@ def uninstall(self) -> NoReturn: ) def execute(self, args: list[str]) -> Iterator[str]: - with _stream_from_subprocess([self.name, *args]) as stream: + with _stream_from_subprocess([self.path, *args]) as stream: yield from stream def get_telemetry_info(self) -> SourceTelemetryInfo: - return SourceTelemetryInfo(self.name, SourceType.LOCAL_INSTALL, version=None) + return SourceTelemetryInfo( + self.path, + SourceType.LOCAL_INSTALL, + version=self.reported_version, + ) diff --git a/airbyte-lib/airbyte_lib/_factories/connector_factories.py b/airbyte-lib/airbyte_lib/_factories/connector_factories.py index 230f386f1159..322305c48893 100644 --- a/airbyte-lib/airbyte_lib/_factories/connector_factories.py +++ b/airbyte-lib/airbyte_lib/_factories/connector_factories.py @@ -1,7 +1,9 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. from __future__ import annotations -from typing import Any +import shutil +from pathlib import Path +from typing import TYPE_CHECKING, Any from airbyte_lib import exceptions as exc from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor @@ -15,7 +17,7 @@ def get_connector( pip_url: str | None = None, config: dict[str, Any] | None = None, *, - use_local_install: bool = False, + local_executable: Path | str | None = None, install_if_missing: bool = True, ) -> Source: """Get a connector by name and version. @@ -29,25 +31,39 @@ def get_connector( connector name. config: connector config - if not provided, you need to set it later via the set_config method. - use_local_install: whether to use a virtual environment to run the connector. If True, the - connector is expected to be available on the path (e.g. installed via pip). If False, - the connector will be installed automatically in a virtual environment. - install_if_missing: whether to install the connector if it is not available locally. This - parameter is ignored if use_local_install is True. + local_executable: If set, the connector will be assumed to already be installed and will be + executed using this path or executable name. Otherwise, the connector will be installed + automatically in a virtual environment. + install_if_missing: Whether to install the connector if it is not available locally. This + parameter is ignored when local_executable is set. """ - if use_local_install and pip_url: - raise exc.AirbyteLibInputError( - message="Param 'pip_url' is not supported when 'use_local_install' is True." - ) + if local_executable: + if pip_url: + raise exc.AirbyteLibInputError( + message="Param 'pip_url' is not supported when 'local_executable' is set." + ) + if version: + raise exc.AirbyteLibInputError( + message="Param 'version' is not supported when 'local_executable' is set." + ) - if use_local_install and version: - raise exc.AirbyteLibInputError( - message="Param 'version' is not supported when 'use_local_install' is True." - ) + if isinstance(local_executable, str): + if "/" in local_executable or "\\" in local_executable: + # Assume this is a path + local_executable = Path(local_executable).absolute() + else: + which_executable = shutil.which(local_executable) + if which_executable is None: + raise FileNotFoundError(local_executable) + local_executable = Path(which_executable).absolute() - if use_local_install and install_if_missing: - raise exc.AirbyteLibInputError( - message="Param 'install_if_missing' is not supported when 'use_local_install' is True." + return Source( + name=name, + config=config, + executor=PathExecutor( + name=name, + path=local_executable, + ), ) metadata: ConnectorMetadata | None = None @@ -55,24 +71,17 @@ def get_connector( metadata = get_connector_metadata(name) except exc.AirbyteConnectorNotRegisteredError: if not pip_url: + # We don't have a pip url or registry entry, so we can't install the connector raise - if use_local_install: - executor: Executor = PathExecutor( - name=name, - target_version=version, - ) - - else: - executor = VenvExecutor( - name=name, - metadata=metadata, - target_version=version, - pip_url=pip_url, - ) - + executor = VenvExecutor( + name=name, + metadata=metadata, + target_version=version, + pip_url=pip_url, + ) if install_if_missing: - executor.install() + executor.ensure_installation() return Source( executor=executor, diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index 50f627f56a77..fda00928bdd0 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -2,6 +2,7 @@ from __future__ import annotations import json +from pathlib import Path import tempfile from contextlib import contextmanager, suppress from typing import TYPE_CHECKING, Any @@ -267,17 +268,23 @@ def check(self) -> None: * Make sure the subprocess is killed when the function returns. """ with as_temp_files([self._config]) as [config_file]: - for msg in self._execute(["check", "--config", config_file]): - if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: - if msg.connectionStatus.status != Status.FAILED: - return # Success! - - raise exc.AirbyteConnectorCheckFailedError( - context={ - "message": msg.connectionStatus.message, - } - ) - raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) + try: + for msg in self._execute(["check", "--config", config_file]): + if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: + if msg.connectionStatus.status != Status.FAILED: + return # Success! + + raise exc.AirbyteConnectorCheckFailedError( + context={ + "message": msg.connectionStatus.message, + } + ) + raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) + except exc.AirbyteConnectorReadError as ex: + raise exc.AirbyteConnectorCheckFailedError( + message="The connector failed to check the connection.", + log_text=ex.log_text, + ) from ex def install(self) -> None: """Install the connector if it is not yet installed.""" @@ -357,7 +364,8 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]: * Read the output line by line of the subprocess and serialize them AirbyteMessage objects. Drop if not valid. """ - self.executor.ensure_installation() + # Fail early if the connector is not installed. + self.executor.ensure_installation(auto_fix=False) try: self._last_log_messages = [] diff --git a/airbyte-lib/airbyte_lib/validate.py b/airbyte-lib/airbyte_lib/validate.py index c9ce3944a35a..113f46f326a9 100644 --- a/airbyte-lib/airbyte_lib/validate.py +++ b/airbyte-lib/airbyte_lib/validate.py @@ -37,11 +37,16 @@ def _parse_args() -> argparse.Namespace: def _run_subprocess_and_raise_on_failure(args: list[str]) -> None: - result = subprocess.run(args, check=False) + result = subprocess.run( + args, + check=False, + stderr=subprocess.PIPE, + ) if result.returncode != 0: raise exc.AirbyteSubprocessFailedError( run_args=args, exit_code=result.returncode, + log_text=result.stderr.decode("utf-8"), ) @@ -50,7 +55,8 @@ def tests(connector_name: str, sample_config: str) -> None: source = ab.get_connector( # TODO: FIXME: noqa: SIM115, PTH123 connector_name, - config=json.load(open(sample_config)), # noqa: SIM115, PTH123 + config=json.load(open(sample_config)), # noqa: SIM115, PTH123, + install_if_missing=False, ) print("Running check...") diff --git a/airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/__init__.py b/airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py index 1ea3e13cd7ce..5d626d654035 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_integration.py @@ -63,8 +63,9 @@ def test_list_streams(expected_test_stream_data: dict[str, list[dict[str, str | def test_invalid_config(): - with pytest.raises(Exception): - ab.get_connector("source-test", config={"apiKey": 1234}) + source = ab.get_connector("source-test", config={"apiKey": 1234}) + with pytest.raises(exc.AirbyteConnectorCheckFailedError): + source.check() def test_non_existing_connector(): @@ -96,17 +97,32 @@ def test_version_enforcement(raises, latest_available_version, requested_version _cache["source-test"].latest_available_version = latest_available_version if raises: with pytest.raises(Exception): - ab.get_connector("source-test", version=requested_version, config={"apiKey": "abc"}) + source = ab.get_connector( + "source-test", + version=requested_version, + config={"apiKey": "abc"}, + install_if_missing=False, + ) + source.executor.ensure_installation(auto_fix=False) else: - ab.get_connector("source-test", version=requested_version, config={"apiKey": "abc"}) + source = ab.get_connector( + "source-test", + version=requested_version, + config={"apiKey": "abc"}, + install_if_missing=False, + ) + source.executor.ensure_installation(auto_fix=False) # reset _cache["source-test"].latest_available_version = "0.0.1" def test_check(): - source = ab.get_connector("source-test", config={"apiKey": "test"}) - + source = ab.get_connector( + "source-test", + config={"apiKey": "test"}, + install_if_missing=False, + ) source.check() @@ -370,6 +386,13 @@ def test_sync_with_merge_to_postgres(new_pg_cache_config: PostgresCacheConfig, e check_dtype=False, ) + +def test_airbyte_lib_version() -> None: + assert get_version() + assert isinstance(get_version(), str) + assert len(get_version().split(".")) == 3 + + @patch.dict('os.environ', {'DO_NOT_TRACK': ''}) @patch('airbyte_lib.telemetry.requests') @patch('airbyte_lib.telemetry.datetime') @@ -512,27 +535,48 @@ def test_failing_path_connector(): ab.get_connector("source-test", config={"apiKey": "test"}, use_local_install=True) def test_succeeding_path_connector(): - old_path = os.environ["PATH"] + new_path = f"{os.path.abspath('.venv-source-test/bin')}:{os.environ['PATH']}" + + # Patch the PATH env var to include the test venv bin folder + with patch.dict(os.environ, {"PATH": new_path}): + source = ab.get_connector( + "source-test", + config={"apiKey": "test"}, + local_executable="source-test", + ) + source.check() - # set path to include the test venv bin folder - os.environ["PATH"] = f"{os.path.abspath('.venv-source-test/bin')}:{os.environ['PATH']}" - source = ab.get_connector("source-test", config={"apiKey": "test"}, use_local_install=True) - source.check() +def test_install_uninstall(): + with tempfile.TemporaryDirectory() as temp_dir: + source = ab.get_connector( + "source-test", + pip_url="./tests/integration_tests/fixtures/source-test", + config={"apiKey": "test"}, + install_if_missing=False, + ) - os.environ["PATH"] = old_path + # Override the install root to a temp dir + install_root = Path(temp_dir) + source.executor.install_root = install_root -def test_install_uninstall(): - source = ab.get_connector("source-test", pip_url="./tests/integration_tests/fixtures/source-test", config={"apiKey": "test"}, install_if_missing=False) + # assert that the venv is gone + assert not os.path.exists(install_root / ".venv-source-test") - source.uninstall() + # use which to check if the executable is available + assert shutil.which("source-test") is None - # assert that the venv is gone - assert not os.path.exists(".venv-source-test") + # assert that the connector is not available + with pytest.raises(Exception): + source.check() + + source.install() + + assert os.path.exists(install_root / ".venv-source-test") + assert os.path.exists(install_root / ".venv-source-test/bin/source-test") - # assert that the connector is not available - with pytest.raises(Exception): source.check() - source.install() + source.uninstall() - source.check() + assert not os.path.exists(install_root / ".venv-source-test") + assert not os.path.exists(install_root / ".venv-source-test/bin/source-test") From 809918b41527d09ba41a70212da757ed3b76db10 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 14:44:02 -0800 Subject: [PATCH 10/28] fix exception handling in _get_installed_version() --- airbyte-lib/airbyte_lib/_executor.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index 1379fa0bfe9c..355df286fc67 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -224,22 +224,37 @@ def _get_installed_version( return self.reported_version connector_name = self.name + if not self.interpreter_path.exists(): + # No point in trying to detect the version if the interpreter does not exist + if raise_on_error: + raise exc.AirbyteLibInternalError( + message="Connector's virtual environment interpreter could not be found.", + context={ + "interpreter_path": self.interpreter_path, + }, + ) + return None try: return subprocess.check_output( [ - self._get_venv_path() / "bin" / "python", + self.interpreter_path, "-c", f"from importlib.metadata import version; print(version('{connector_name}'))", ], universal_newlines=True, ).strip() - except subprocess.CalledProcessError: - if not raise_on_error: + except Exception: + if raise_on_error: raise return None + @property + def interpreter_path(self) -> Path: + return self._get_venv_path() / "bin" / "python" + + def ensure_installation( self, *, From 4a41ffb14162e649b7d42455fd5fd52dca8a570f Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 15:57:08 -0800 Subject: [PATCH 11/28] fix tests --- airbyte-lib/airbyte_lib/_executor.py | 8 ++---- .../_factories/connector_factories.py | 4 +-- airbyte-lib/airbyte_lib/source.py | 1 - .../integration_tests/test_integration.py | 26 ++++++++++++++++--- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index 355df286fc67..801d02361282 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -18,7 +18,6 @@ from collections.abc import Generator, Iterable, Iterator - _LATEST_VERSION = "latest" @@ -202,7 +201,6 @@ def install(self) -> None: # Assuming the installation succeeded, store the installed version self.reported_version = self._get_installed_version(raise_on_error=False, recheck=True) - def _get_installed_version( self, *, @@ -254,7 +252,6 @@ def _get_installed_version( def interpreter_path(self) -> Path: return self._get_venv_path() / "bin" / "python" - def ensure_installation( self, *, @@ -358,7 +355,6 @@ def get_telemetry_info(self) -> SourceTelemetryInfo: class PathExecutor(Executor): - def __init__( self, name: str | None = None, @@ -408,12 +404,12 @@ def uninstall(self) -> NoReturn: ) def execute(self, args: list[str]) -> Iterator[str]: - with _stream_from_subprocess([self.path, *args]) as stream: + with _stream_from_subprocess([str(self.path), *args]) as stream: yield from stream def get_telemetry_info(self) -> SourceTelemetryInfo: return SourceTelemetryInfo( - self.path, + str(self.name), SourceType.LOCAL_INSTALL, version=self.reported_version, ) diff --git a/airbyte-lib/airbyte_lib/_factories/connector_factories.py b/airbyte-lib/airbyte_lib/_factories/connector_factories.py index 322305c48893..197ed61142c6 100644 --- a/airbyte-lib/airbyte_lib/_factories/connector_factories.py +++ b/airbyte-lib/airbyte_lib/_factories/connector_factories.py @@ -3,10 +3,10 @@ import shutil from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import Any from airbyte_lib import exceptions as exc -from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor +from airbyte_lib._executor import PathExecutor, VenvExecutor from airbyte_lib.registry import ConnectorMetadata, get_connector_metadata from airbyte_lib.source import Source diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index fda00928bdd0..ee58286b6d06 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -2,7 +2,6 @@ from __future__ import annotations import json -from pathlib import Path import tempfile from contextlib import contextmanager, suppress from typing import TYPE_CHECKING, Any diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py index 5d626d654035..3e474ac3eff2 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_integration.py @@ -7,6 +7,7 @@ from unittest.mock import Mock, call, patch import tempfile from pathlib import Path +import pip from sqlalchemy import column, text @@ -57,17 +58,36 @@ def expected_test_stream_data() -> dict[str, list[dict[str, str | int]]]: } def test_list_streams(expected_test_stream_data: dict[str, list[dict[str, str | int]]]): - source = ab.get_connector("source-test", config={"apiKey": "test"}) - + source = ab.get_connector( + "source-test", config={"apiKey": "test"}, install_if_missing=False + ) assert source.get_available_streams() == list(expected_test_stream_data.keys()) def test_invalid_config(): - source = ab.get_connector("source-test", config={"apiKey": 1234}) + source = ab.get_connector( + "source-test", config={"apiKey": 1234}, install_if_missing=False + ) with pytest.raises(exc.AirbyteConnectorCheckFailedError): source.check() +def test_ensure_installation_detection(): + """Assert that install isn't called, since the connector is already installed by the fixture.""" + with patch("airbyte_lib._executor.VenvExecutor.install") as mock_venv_install, \ + patch("airbyte_lib.source.Source.install") as mock_source_install, \ + patch("airbyte_lib._executor.VenvExecutor.ensure_installation") as mock_ensure_installed: + source = ab.get_connector( + "source-test", + config={"apiKey": 1234}, + pip_url="https://pypi.org/project/airbyte-not-found", + install_if_missing=True, + ) + assert mock_ensure_installed.call_count == 1 + assert not mock_venv_install.called + assert not mock_source_install.called + + def test_non_existing_connector(): with pytest.raises(Exception): ab.get_connector("source-not-existing", config={"apiKey": "abc"}) From bab5e069c84b9e8eec33e782eac861d7b26d5126 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 17:13:13 -0800 Subject: [PATCH 12/28] improve thread safety --- airbyte-lib/airbyte_lib/registry.py | 70 +++++++++++++++---- .../integration_tests/test_integration.py | 58 +++++++++------ 2 files changed, 91 insertions(+), 37 deletions(-) diff --git a/airbyte-lib/airbyte_lib/registry.py b/airbyte-lib/airbyte_lib/registry.py index e3e12796c9d1..4a774336c21e 100644 --- a/airbyte-lib/airbyte_lib/registry.py +++ b/airbyte-lib/airbyte_lib/registry.py @@ -3,6 +3,8 @@ import json import os +import threading +from copy import copy from dataclasses import dataclass from pathlib import Path @@ -12,32 +14,60 @@ from airbyte_lib.version import get_version +__cache: dict[str, ConnectorMetadata] | None = None +_cache_lock = threading.Lock() + +REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" +REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" + + @dataclass class ConnectorMetadata: name: str latest_available_version: str -_cache: dict[str, ConnectorMetadata] | None = None +def _get_registry_url() -> str: + if REGISTRY_ENV_VAR in os.environ: + return str(os.environ.get(REGISTRY_ENV_VAR)) -REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" + return REGISTRY_URL -def _update_cache() -> None: - global _cache - if os.environ.get("AIRBYTE_LOCAL_REGISTRY"): - with Path(str(os.environ.get("AIRBYTE_LOCAL_REGISTRY"))).open() as f: - data = json.load(f) - else: +def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]: + """Return the registry cache.""" + global __cache + if __cache and not force_refresh: + return __cache + + registry_url = _get_registry_url() + if registry_url.startswith("http"): response = requests.get( - REGISTRY_URL, headers={"User-Agent": f"airbyte-lib-{get_version()}"} + registry_url, headers={"User-Agent": f"airbyte-lib-{get_version()}"} ) response.raise_for_status() data = response.json() - _cache = {} + else: + # Assume local file + with Path(registry_url).open() as f: + data = json.load(f) + + new_cache: dict[str, ConnectorMetadata] = {} + for connector in data["sources"]: name = connector["dockerRepository"].replace("airbyte/", "") - _cache[name] = ConnectorMetadata(name, connector["dockerImageTag"]) + new_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"]) + + if len(new_cache) == 0: + raise exc.AirbyteLibInternalError( + message="Connector registry is empty.", + context={ + "registry_url": _get_registry_url(), + }, + ) + + __cache = new_cache + return __cache def get_connector_metadata(name: str) -> ConnectorMetadata: @@ -45,10 +75,20 @@ def get_connector_metadata(name: str) -> ConnectorMetadata: If the cache is empty, populate by calling update_cache. """ - if not _cache: - _update_cache() - if not _cache or name not in _cache: + cache = copy(_get_registry_cache()) + if not cache: + raise exc.AirbyteLibInternalError( + message="Connector registry could not be loaded.", + context={ + "registry_url": _get_registry_url(), + }, + ) + if name not in cache: raise exc.AirbyteConnectorNotRegisteredError( connector_name=name, + context={ + "registry_url": _get_registry_url(), + "available_connectors": sorted(cache.keys()), + }, ) - return _cache[name] + return cache[name] diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py index 3e474ac3eff2..b812662dc74c 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_integration.py @@ -3,11 +3,11 @@ from collections.abc import Mapping import os import shutil +import subprocess from typing import Any from unittest.mock import Mock, call, patch import tempfile from pathlib import Path -import pip from sqlalchemy import column, text @@ -17,7 +17,7 @@ import pytest from airbyte_lib.caches import PostgresCache, PostgresCacheConfig -from airbyte_lib.registry import _update_cache +from airbyte_lib import registry from airbyte_lib.version import get_version from airbyte_lib.results import ReadResult from airbyte_lib.datasets import CachedDataset, LazyDataset, SQLDataset @@ -27,24 +27,32 @@ from airbyte_lib import exceptions as exc -@pytest.fixture(scope="module", autouse=True) +LOCAL_TEST_REGISTRY_URL = "./tests/integration_tests/fixtures/registry.json" + + +@pytest.fixture(scope="package", autouse=True) def prepare_test_env(): """ Prepare test environment. This will pre-install the test source from the fixtures array and set the environment variable to use the local json file as registry. """ + venv_dir = f".venv-source-test" if os.path.exists(".venv-source-test"): shutil.rmtree(".venv-source-test") - os.system("python -m venv .venv-source-test") - os.system(".venv-source-test/bin/pip install -e ./tests/integration_tests/fixtures/source-test") + subprocess.run(["python", "-m", "venv", venv_dir], check=True) + subprocess.run([f"{venv_dir}/bin/pip", "install", "-e", "./tests/integration_tests/fixtures/source-test"], check=True) - os.environ["AIRBYTE_LOCAL_REGISTRY"] = "./tests/integration_tests/fixtures/registry.json" + os.environ["AIRBYTE_LOCAL_REGISTRY"] = LOCAL_TEST_REGISTRY_URL os.environ["DO_NOT_TRACK"] = "true" + # Force-refresh the registry cache + _ = registry._get_registry_cache(force_refresh=True) + yield shutil.rmtree(".venv-source-test") + @pytest.fixture def expected_test_stream_data() -> dict[str, list[dict[str, str | int]]]: return { @@ -57,6 +65,14 @@ def expected_test_stream_data() -> dict[str, list[dict[str, str | int]]]: ], } +def test_registry_get(): + assert registry._get_registry_url() == LOCAL_TEST_REGISTRY_URL + + metadata = registry.get_connector_metadata("source-test") + assert metadata.name == "source-test" + assert metadata.latest_available_version == "0.0.1" + + def test_list_streams(expected_test_stream_data: dict[str, list[dict[str, str | int]]]): source = ab.get_connector( "source-test", config={"apiKey": "test"}, install_if_missing=False @@ -112,11 +128,20 @@ def test_version_enforcement(raises, latest_available_version, requested_version In this test, the actually installed version is 0.0.1 """ - _update_cache() - from airbyte_lib.registry import _cache - _cache["source-test"].latest_available_version = latest_available_version - if raises: - with pytest.raises(Exception): + patched_entry = registry.ConnectorMetadata( + name="source-test", latest_available_version=latest_available_version + ) + with patch.dict("airbyte_lib.registry.__cache", {"source-test": patched_entry}, clear=False): + if raises: + with pytest.raises(Exception): + source = ab.get_connector( + "source-test", + version=requested_version, + config={"apiKey": "abc"}, + install_if_missing=False, + ) + source.executor.ensure_installation(auto_fix=False) + else: source = ab.get_connector( "source-test", version=requested_version, @@ -124,17 +149,6 @@ def test_version_enforcement(raises, latest_available_version, requested_version install_if_missing=False, ) source.executor.ensure_installation(auto_fix=False) - else: - source = ab.get_connector( - "source-test", - version=requested_version, - config={"apiKey": "abc"}, - install_if_missing=False, - ) - source.executor.ensure_installation(auto_fix=False) - - # reset - _cache["source-test"].latest_available_version = "0.0.1" def test_check(): From 10ce07738c58c949b687312acaf80d65c82e531a Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 20:36:25 -0800 Subject: [PATCH 13/28] handle quoted spaces in pip_url --- airbyte-lib/airbyte_lib/_executor.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index 801d02361282..0e0925fe9be7 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -1,5 +1,6 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. from __future__ import annotations +import shlex import subprocess import sys @@ -29,6 +30,10 @@ def __init__( metadata: ConnectorMetadata | None = None, target_version: str | None = None, ) -> None: + """Initialize a connector executor. + + The 'name' param is required if 'metadata' is None. + """ if not name and not metadata: raise exc.AirbyteLibInternalError(message="Either name or metadata must be provided.") @@ -187,7 +192,7 @@ def install(self) -> None: try: self._run_subprocess_and_raise_on_failure( - args=[pip_path, "install", *self.pip_url.split(" ")] + args=[pip_path, "install", *shlex.split(self.pip_url)] ) except exc.AirbyteSubprocessFailedError as ex: # If the installation failed, remove the virtual environment From 063bba34e75b49c8c7310ce065569ce132dbd3cf Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 20:36:42 -0800 Subject: [PATCH 14/28] fix import sorts --- airbyte-lib/airbyte_lib/_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index 0e0925fe9be7..a9d003ff5cae 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -1,7 +1,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. from __future__ import annotations -import shlex +import shlex import subprocess import sys from abc import ABC, abstractmethod From ab75be432c124c89f7f78a1c6bc822aaf70e8679 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 20:56:35 -0800 Subject: [PATCH 15/28] standalone validate_config() method --- airbyte-lib/airbyte_lib/source.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index ee58286b6d06..a0b19863125f 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -118,10 +118,11 @@ def set_config( If validate is True, raise an exception if the config fails validation. - If validate is False, validation will be deferred until check() is called. + If validate is False, validation will be deferred until check() or validate_config() + is called. """ if validate: - self._validate_config(config) + self.validate_config(config) self._config_dict = config @@ -150,9 +151,13 @@ def _discover(self) -> AirbyteCatalog: log_text=self._last_log_messages, ) - def _validate_config(self, config: dict[str, Any]) -> None: - """Validate the config against the spec.""" + def validate_config(self, config: dict[str, Any] | None = None) -> None: + """Validate the config against the spec. + + If config is not provided, the already-set config will be validated. + """ spec = self._get_spec(force_refresh=False) + config = self._config if config is None else config jsonschema.validate(config, spec.connectionSpecification) def get_available_streams(self) -> list[str]: From 8880b0b983379af3b22b56f28b60459c44ab0860 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 21:33:20 -0800 Subject: [PATCH 16/28] add Source.yaml_spec property --- airbyte-lib/airbyte_lib/source.py | 9 +++++++++ airbyte-lib/tests/integration_tests/test_integration.py | 7 +++++++ 2 files changed, 16 insertions(+) diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index a0b19863125f..717390c7783a 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any import jsonschema +import yaml from airbyte_protocol.models import ( AirbyteCatalog, @@ -185,6 +186,14 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: log_text=self._last_log_messages, ) + @property + def yaml_spec(self) -> str: + """Get the spec as a yaml string.""" + spec_obj: ConnectorSpecification = self._get_spec() + spec_dict = spec_obj.dict(exclude_unset=True) + # convert to a yaml string + return yaml.dump(spec_dict) + @property def discovered_catalog(self) -> AirbyteCatalog: """Get the raw catalog for the given streams. diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py index b812662dc74c..2dbb866f5d09 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_integration.py @@ -104,6 +104,13 @@ def test_ensure_installation_detection(): assert not mock_source_install.called +def test_source_yaml_spec(): + source = ab.get_connector( + "source-test", config={"apiKey": 1234}, install_if_missing=False + ) + assert source.yaml_spec.startswith("connectionSpecification:\n $schema:") + + def test_non_existing_connector(): with pytest.raises(Exception): ab.get_connector("source-not-existing", config={"apiKey": "abc"}) From 377314962d36bbb3e36d75024d137f207ee21896 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 21:59:32 -0800 Subject: [PATCH 17/28] make _yaml_spec a protected member --- airbyte-lib/airbyte_lib/source.py | 2 +- airbyte-lib/tests/integration_tests/test_integration.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index 717390c7783a..a68dcdb05b36 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -187,7 +187,7 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: ) @property - def yaml_spec(self) -> str: + def _yaml_spec(self) -> str: """Get the spec as a yaml string.""" spec_obj: ConnectorSpecification = self._get_spec() spec_dict = spec_obj.dict(exclude_unset=True) diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py index 2dbb866f5d09..8a8be464d758 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_integration.py @@ -108,7 +108,7 @@ def test_source_yaml_spec(): source = ab.get_connector( "source-test", config={"apiKey": 1234}, install_if_missing=False ) - assert source.yaml_spec.startswith("connectionSpecification:\n $schema:") + assert source._yaml_spec.startswith("connectionSpecification:\n $schema:") def test_non_existing_connector(): From 90918c809cf4642f57bb84001baa7384e60a0ac0 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Fri, 26 Jan 2024 22:13:08 -0800 Subject: [PATCH 18/28] fix too-limited json package_data glob --- airbyte-integrations/connectors/source-github/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-github/setup.py b/airbyte-integrations/connectors/source-github/setup.py index 8b5f90f29e12..88b88cb6df3d 100644 --- a/airbyte-integrations/connectors/source-github/setup.py +++ b/airbyte-integrations/connectors/source-github/setup.py @@ -16,7 +16,7 @@ author_email="contact@airbyte.io", packages=find_packages(), install_requires=MAIN_REQUIREMENTS, - package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]}, + package_data={"": ["**/*.json"]}, extras_require={ "tests": TEST_REQUIREMENTS, }, From 9197728171656ff538b6dc72186aa5b06375a2ec Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 28 Jan 2024 09:05:18 -0800 Subject: [PATCH 19/28] fix missing copyright str --- airbyte-lib/tests/integration_tests/test_install.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-lib/tests/integration_tests/test_install.py b/airbyte-lib/tests/integration_tests/test_install.py index b5e822320503..c93801489f37 100644 --- a/airbyte-lib/tests/integration_tests/test_install.py +++ b/airbyte-lib/tests/integration_tests/test_install.py @@ -1,3 +1,5 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + from gettext import install import pytest From f73f288211f2af739c39abef37ffeaa1e15b297a Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 28 Jan 2024 09:11:08 -0800 Subject: [PATCH 20/28] docstring --- airbyte-lib/airbyte_lib/source.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index a68dcdb05b36..f165e04c324e 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -188,7 +188,14 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: @property def _yaml_spec(self) -> str: - """Get the spec as a yaml string.""" + """Get the spec as a yaml string. + + For now, the primary use case is for writing and debugging a valid config for a source. + + This is private for now because we probably want better polish before exposing this + as a stable interface. This will also get easier when we have docs links with this info + for each connector. + """ spec_obj: ConnectorSpecification = self._get_spec() spec_dict = spec_obj.dict(exclude_unset=True) # convert to a yaml string From dd9ac99d9741632787583c4c43814e28cb987a0a Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 28 Jan 2024 09:11:48 -0800 Subject: [PATCH 21/28] update docs --- airbyte-lib/docs/generated/airbyte_lib.html | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/airbyte-lib/docs/generated/airbyte_lib.html b/airbyte-lib/docs/generated/airbyte_lib.html index dae64a375446..4b0aa9dd9c74 100644 --- a/airbyte-lib/docs/generated/airbyte_lib.html +++ b/airbyte-lib/docs/generated/airbyte_lib.html @@ -479,7 +479,25 @@
Inherited Members

If validate is True, raise an exception if the config fails validation.

-

If validate is False, validation will be deferred until check() is called.

+

If validate is False, validation will be deferred until check() or validate_config() +is called.

+ + + + +
+
+ + def + validate_config(self, config: dict[str, typing.Any] | None = None) -> None: + + +
+ + +

Validate the config against the spec.

+ +

If config is not provided, the already-set config will be validated.

From a2bed019bf30b7127172c6dfb9d1514f7970dedd Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 28 Jan 2024 09:13:58 -0800 Subject: [PATCH 22/28] revert source-github change --- airbyte-integrations/connectors/source-github/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-github/setup.py b/airbyte-integrations/connectors/source-github/setup.py index 88b88cb6df3d..8b5f90f29e12 100644 --- a/airbyte-integrations/connectors/source-github/setup.py +++ b/airbyte-integrations/connectors/source-github/setup.py @@ -16,7 +16,7 @@ author_email="contact@airbyte.io", packages=find_packages(), install_requires=MAIN_REQUIREMENTS, - package_data={"": ["**/*.json"]}, + package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]}, extras_require={ "tests": TEST_REQUIREMENTS, }, From 2e491549e40913c2ff6a0e457937e74bcb3d2ced Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 28 Jan 2024 09:16:40 -0800 Subject: [PATCH 23/28] updated comment --- airbyte-lib/tests/integration_tests/test_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py index 8a8be464d758..b4a97418c09b 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_integration.py @@ -596,7 +596,7 @@ def test_install_uninstall(): install_if_missing=False, ) - # Override the install root to a temp dir + # Override the install root to avoid conflicts with the test fixture install_root = Path(temp_dir) source.executor.install_root = install_root From f975282aa0abea749955ee4aee3e718d8146d5db Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 28 Jan 2024 11:38:54 -0800 Subject: [PATCH 24/28] remove redundant strings --- airbyte-lib/tests/integration_tests/test_integration.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py index b4a97418c09b..04263d7c51b1 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_integration.py @@ -35,9 +35,9 @@ def prepare_test_env(): """ Prepare test environment. This will pre-install the test source from the fixtures array and set the environment variable to use the local json file as registry. """ - venv_dir = f".venv-source-test" - if os.path.exists(".venv-source-test"): - shutil.rmtree(".venv-source-test") + venv_dir = ".venv-source-test" + if os.path.exists(venv_dir): + shutil.rmtree(venv_dir) subprocess.run(["python", "-m", "venv", venv_dir], check=True) subprocess.run([f"{venv_dir}/bin/pip", "install", "-e", "./tests/integration_tests/fixtures/source-test"], check=True) From 8775c1b8bea950a2e4a52d97d4c76e5955207840 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Sun, 28 Jan 2024 11:58:23 -0800 Subject: [PATCH 25/28] update docs (removes empty cloud page) --- airbyte-lib/docs/generated/airbyte_lib/cloud.html | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 airbyte-lib/docs/generated/airbyte_lib/cloud.html diff --git a/airbyte-lib/docs/generated/airbyte_lib/cloud.html b/airbyte-lib/docs/generated/airbyte_lib/cloud.html deleted file mode 100644 index c0d27ca14eaa..000000000000 --- a/airbyte-lib/docs/generated/airbyte_lib/cloud.html +++ /dev/null @@ -1,7 +0,0 @@ - -
-
- - - - \ No newline at end of file From f24226d49f8d5ed550c12cc7a9f6b0904aad9e50 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 29 Jan 2024 21:33:10 -0800 Subject: [PATCH 26/28] remove unused lock --- airbyte-lib/airbyte_lib/registry.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-lib/airbyte_lib/registry.py b/airbyte-lib/airbyte_lib/registry.py index 4a774336c21e..a7faf64eb919 100644 --- a/airbyte-lib/airbyte_lib/registry.py +++ b/airbyte-lib/airbyte_lib/registry.py @@ -3,7 +3,6 @@ import json import os -import threading from copy import copy from dataclasses import dataclass from pathlib import Path @@ -15,7 +14,7 @@ __cache: dict[str, ConnectorMetadata] | None = None -_cache_lock = threading.Lock() + REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" From 7370d83b9b4491276584d3d349fa0c06fa342273 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 29 Jan 2024 21:37:47 -0800 Subject: [PATCH 27/28] rename AirbyteConnectoNotFoundError to AirbyteConnectorExecutableNotFoundError to avoid confusion --- airbyte-lib/airbyte_lib/_executor.py | 2 +- airbyte-lib/airbyte_lib/exceptions.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index a9d003ff5cae..a43d56249163 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -392,7 +392,7 @@ def ensure_installation( # TODO: Improve error handling. We should try to distinguish between # a connector that is not installed and a connector that is not # working properly. - raise exc.AirbyteConnectorNotFoundError( + raise exc.AirbyteConnectorExecutableNotFoundError( connector_name=self.name, ) from e diff --git a/airbyte-lib/airbyte_lib/exceptions.py b/airbyte-lib/airbyte_lib/exceptions.py index f270069bf42e..934e936d4d48 100644 --- a/airbyte-lib/airbyte_lib/exceptions.py +++ b/airbyte-lib/airbyte_lib/exceptions.py @@ -192,8 +192,8 @@ class AirbyteConnectorError(AirbyteError): connector_name: str | None = None -class AirbyteConnectorNotFoundError(AirbyteConnectorError): - """Connector name not found in registry.""" +class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError): + """Connector executable not found.""" class AirbyteConnectorInstallationError(AirbyteConnectorError): From 84468388bd530a1912876181e984d175f310bc92 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 29 Jan 2024 21:58:03 -0800 Subject: [PATCH 28/28] allow prereleases in version check --- airbyte-lib/tests/integration_tests/test_integration.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py index 5a737601c742..5d5586197577 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_integration.py @@ -198,7 +198,7 @@ def assert_cache_data(expected_test_stream_data: dict[str, list[dict[str, str | pd.DataFrame(expected_test_stream_data[stream_name]), check_dtype=False, ) - + # validate that the cache doesn't contain any other streams if streams: assert len(list(cache.__iter__())) == len(streams) @@ -520,7 +520,9 @@ def test_sync_with_merge_to_postgres(new_pg_cache_config: PostgresCacheConfig, e def test_airbyte_lib_version() -> None: assert get_version() assert isinstance(get_version(), str) - assert len(get_version().split(".")) == 3 + + # Ensure the version is a valid semantic version (x.y.z or x.y.z.alpha0) + assert 3 <= len(get_version().split(".")) <= 4 @patch.dict('os.environ', {'DO_NOT_TRACK': ''})