From 29dbb0c7e92807cca9b662b9118485b2ccf1591a Mon Sep 17 00:00:00 2001
From: "Aaron (\"AJ\") Steers"
Date: Mon, 29 Jan 2024 22:07:21 -0800
Subject: [PATCH] AirbyteLib: Installation improvements and improved error
handling (#34572)
---
airbyte-lib/airbyte_lib/_executor.py | 310 +++++++++++++-----
.../_factories/connector_factories.py | 76 +++--
airbyte-lib/airbyte_lib/exceptions.py | 12 +-
airbyte-lib/airbyte_lib/registry.py | 73 +++--
airbyte-lib/airbyte_lib/source.py | 81 ++++-
airbyte-lib/airbyte_lib/validate.py | 10 +-
airbyte-lib/docs/generated/airbyte_lib.html | 46 ++-
.../source-test/source_test/__init__.py | 0
.../tests/integration_tests/test_install.py | 23 ++
.../integration_tests/test_integration.py | 167 +++++++---
10 files changed, 609 insertions(+), 189 deletions(-)
create mode 100644 airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/__init__.py
create mode 100644 airbyte-lib/tests/integration_tests/test_install.py
diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py
index 20899f892006..a43d56249163 100644
--- a/airbyte-lib/airbyte_lib/_executor.py
+++ b/airbyte-lib/airbyte_lib/_executor.py
@@ -1,22 +1,23 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations
+import shlex
import subprocess
import sys
from abc import ABC, abstractmethod
-from contextlib import contextmanager
+from contextlib import contextmanager, suppress
from pathlib import Path
-from typing import IO, TYPE_CHECKING, Any, NoReturn
+from shutil import rmtree
+from typing import IO, TYPE_CHECKING, Any, NoReturn, cast
from airbyte_lib import exceptions as exc
+from airbyte_lib.registry import ConnectorMetadata
from airbyte_lib.telemetry import SourceTelemetryInfo, SourceType
if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator
- from airbyte_lib.registry import ConnectorMetadata
-
_LATEST_VERSION = "latest"
@@ -24,22 +25,37 @@
class Executor(ABC):
def __init__(
self,
- metadata: ConnectorMetadata,
+ *,
+ name: str | None = None,
+ metadata: ConnectorMetadata | None = None,
target_version: str | None = None,
) -> None:
- self.metadata = metadata
- self.enforce_version = target_version is not None
- if target_version is None or target_version == _LATEST_VERSION:
- self.target_version = metadata.latest_available_version
- else:
- self.target_version = target_version
+ """Initialize a connector executor.
+
+ The 'name' param is required if 'metadata' is None.
+ """
+ if not name and not metadata:
+ raise exc.AirbyteLibInternalError(message="Either name or metadata must be provided.")
+
+ self.name: str = name or cast(ConnectorMetadata, metadata).name # metadata is not None here
+ self.metadata: ConnectorMetadata | None = metadata
+ self.enforce_version: bool = target_version is not None
+
+ self.reported_version: str | None = None
+ self.target_version: str | None = None
+ if target_version:
+ if metadata and target_version == _LATEST_VERSION:
+ self.target_version = metadata.latest_available_version
+ else:
+ self.target_version = target_version
@abstractmethod
def execute(self, args: list[str]) -> Iterator[str]:
pass
@abstractmethod
- def ensure_installation(self) -> None:
+ def ensure_installation(self, *, auto_fix: bool = True) -> None:
+ _ = auto_fix
pass
@abstractmethod
@@ -101,71 +117,150 @@ def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]:
# If the exit code is not 0 or -15 (SIGTERM), raise an exception
if exit_code not in (0, -15):
- raise exc.AirbyteSubprocessFailedError(exit_code=exit_code)
+ raise exc.AirbyteSubprocessFailedError(
+ run_args=args,
+ exit_code=exit_code,
+ )
class VenvExecutor(Executor):
def __init__(
self,
- metadata: ConnectorMetadata,
+ name: str | None = None,
+ *,
+ metadata: ConnectorMetadata | None = None,
target_version: str | None = None,
pip_url: str | None = None,
- *,
- install_if_missing: bool = False,
+ install_root: Path | None = None,
) -> None:
- super().__init__(metadata, target_version)
- self.install_if_missing = install_if_missing
+ """Initialize a connector executor that runs a connector in a virtual environment.
+
+ Args:
+ name: The name of the connector.
+ metadata: (Optional.) The metadata of the connector.
+ target_version: (Optional.) The version of the connector to install.
+ pip_url: (Optional.) The pip URL of the connector to install.
+ install_root: (Optional.) The root directory where the virtual environment will be
+ created. If not provided, the current working directory will be used.
+ """
+ super().__init__(name=name, metadata=metadata, target_version=target_version)
# This is a temporary install path that will be replaced with a proper package
# name once they are published.
- # TODO: Replace with `f"airbyte-{self.metadata.name}"`
- self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.metadata.name}"
+ # TODO: Replace with `f"airbyte-{self.name}"`
+ self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.name}"
+ self.install_root = install_root or Path.cwd()
def _get_venv_name(self) -> str:
- return f".venv-{self.metadata.name}"
+ return f".venv-{self.name}"
+
+ def _get_venv_path(self) -> Path:
+ return self.install_root / self._get_venv_name()
def _get_connector_path(self) -> Path:
- return Path(self._get_venv_name(), "bin", self.metadata.name)
+ return self._get_venv_path() / "bin" / self.name
def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None:
- result = subprocess.run(args, check=False)
+ result = subprocess.run(
+ args,
+ check=False,
+ stderr=subprocess.PIPE,
+ )
if result.returncode != 0:
- raise exc.AirbyteConnectorInstallationError from exc.AirbyteSubprocessFailedError(
- exit_code=result.returncode
+ raise exc.AirbyteSubprocessFailedError(
+ run_args=args,
+ exit_code=result.returncode,
+ log_text=result.stderr.decode("utf-8"),
)
def uninstall(self) -> None:
- venv_name = self._get_venv_name()
- if Path(venv_name).exists():
- self._run_subprocess_and_raise_on_failure(["rm", "-rf", venv_name])
+ if self._get_venv_path().exists():
+ rmtree(str(self._get_venv_path()))
+
+ self.reported_version = None # Reset the reported version from the previous installation
def install(self) -> None:
- venv_name = self._get_venv_name()
- self._run_subprocess_and_raise_on_failure([sys.executable, "-m", "venv", venv_name])
+ """Install the connector in a virtual environment.
+
+ After installation, the installed version will be stored in self.reported_version.
+ """
+ self._run_subprocess_and_raise_on_failure(
+ [sys.executable, "-m", "venv", str(self._get_venv_path())]
+ )
- pip_path = str(Path(venv_name) / "bin" / "pip")
+ pip_path = str(self._get_venv_path() / "bin" / "pip")
- self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", self.pip_url])
+ try:
+ self._run_subprocess_and_raise_on_failure(
+ args=[pip_path, "install", *shlex.split(self.pip_url)]
+ )
+ except exc.AirbyteSubprocessFailedError as ex:
+ # If the installation failed, remove the virtual environment
+ # Otherwise, the connector will be considered as installed and the user may not be able
+ # to retry the installation.
+ with suppress(exc.AirbyteSubprocessFailedError):
+ self.uninstall()
- def _get_installed_version(self) -> str:
+ raise exc.AirbyteConnectorInstallationError from ex
+
+ # Assuming the installation succeeded, store the installed version
+ self.reported_version = self._get_installed_version(raise_on_error=False, recheck=True)
+
+ def _get_installed_version(
+ self,
+ *,
+ raise_on_error: bool = False,
+ recheck: bool = False,
+ ) -> str | None:
"""Detect the version of the connector installed.
+ Returns the version string if it can be detected, otherwise None.
+
+ If raise_on_error is True, raise an exception if the version cannot be detected.
+
+ If recheck if False and the version has already been detected, return the cached value.
+
In the venv, we run the following:
> python -c "from importlib.metadata import version; print(version(''))"
"""
- venv_name = self._get_venv_name()
- connector_name = self.metadata.name
- return subprocess.check_output(
- [
- Path(venv_name) / "bin" / "python",
- "-c",
- f"from importlib.metadata import version; print(version('{connector_name}'))",
- ],
- universal_newlines=True,
- ).strip()
+ if not recheck and self.reported_version:
+ return self.reported_version
+
+ connector_name = self.name
+ if not self.interpreter_path.exists():
+ # No point in trying to detect the version if the interpreter does not exist
+ if raise_on_error:
+ raise exc.AirbyteLibInternalError(
+ message="Connector's virtual environment interpreter could not be found.",
+ context={
+ "interpreter_path": self.interpreter_path,
+ },
+ )
+ return None
+
+ try:
+ return subprocess.check_output(
+ [
+ self.interpreter_path,
+ "-c",
+ f"from importlib.metadata import version; print(version('{connector_name}'))",
+ ],
+ universal_newlines=True,
+ ).strip()
+ except Exception:
+ if raise_on_error:
+ raise
+
+ return None
+
+ @property
+ def interpreter_path(self) -> Path:
+ return self._get_venv_path() / "bin" / "python"
def ensure_installation(
self,
+ *,
+ auto_fix: bool = True,
) -> None:
"""Ensure that the connector is installed in a virtual environment.
@@ -176,48 +271,77 @@ def ensure_installation(
Note: Version verification is not supported for connectors installed from a
local path.
"""
- venv_name = f".venv-{self.metadata.name}"
- venv_path = Path(venv_name)
- if not venv_path.exists():
- if not self.install_if_missing:
- raise exc.AirbyteConnectorNotFoundError(
- message="Connector not available and venv does not exist.",
- guidance=(
- "Please ensure the connector is pre-installed or consider enabling "
- "`install_if_missing=True`."
- ),
+ # Store the installed version (or None if not installed)
+ if not self.reported_version:
+ self.reported_version = self._get_installed_version()
+
+ original_installed_version = self.reported_version
+
+ reinstalled = False
+ venv_name = f".venv-{self.name}"
+ if not self._get_venv_path().exists():
+ if not auto_fix:
+ raise exc.AirbyteConnectorInstallationError(
+ message="Virtual environment does not exist.",
+ connector_name=self.name,
+ context={
+ "venv_path": self._get_venv_path(),
+ },
+ )
+
+ # If the venv path does not exist, install.
+ self.install()
+ reinstalled = True
+
+ elif not self._get_connector_path().exists():
+ if not auto_fix:
+ raise exc.AirbyteConnectorInstallationError(
+ message="Could not locate connector executable within the virtual environment.",
+ connector_name=self.name,
context={
- "connector_name": self.metadata.name,
- "venv_name": venv_name,
+ "connector_path": self._get_connector_path(),
},
)
+
+ # If the connector path does not exist, uninstall and re-install.
+ # This is sometimes caused by a failed or partial installation.
+ self.uninstall()
self.install()
+ reinstalled = True
+
+ # By now, everything should be installed. Raise an exception if not.
connector_path = self._get_connector_path()
if not connector_path.exists():
- raise exc.AirbyteConnectorNotFoundError(
- connector_name=self.metadata.name,
+ raise exc.AirbyteConnectorInstallationError(
+ message="Connector's executable could not be found within the virtual environment.",
+ connector_name=self.name,
context={
- "venv_name": venv_name,
+ "connector_path": self._get_connector_path(),
},
) from FileNotFoundError(connector_path)
if self.enforce_version:
- installed_version = self._get_installed_version()
- if installed_version != self.target_version:
- # If the version doesn't match, reinstall
- self.install()
+ version_after_reinstall: str | None = None
+ if self.reported_version != self.target_version:
+ if auto_fix and not reinstalled:
+ # If we haven't already reinstalled above, reinstall now.
+ self.install()
+ reinstalled = True
+
+ if reinstalled:
+ version_after_reinstall = self.reported_version
# Check the version again
- version_after_install = self._get_installed_version()
- if version_after_install != self.target_version:
+ if self.reported_version != self.target_version:
raise exc.AirbyteConnectorInstallationError(
- connector_name=self.metadata.name,
+ message="Connector's reported version does not match the target version.",
+ connector_name=self.name,
context={
"venv_name": venv_name,
"target_version": self.target_version,
- "installed_version": installed_version,
- "version_after_install": version_after_install,
+ "original_installed_version": original_installed_version,
+ "version_after_reinstall": version_after_reinstall,
},
)
@@ -228,33 +352,69 @@ def execute(self, args: list[str]) -> Iterator[str]:
yield from stream
def get_telemetry_info(self) -> SourceTelemetryInfo:
- return SourceTelemetryInfo(self.metadata.name, SourceType.VENV, self.target_version)
+ return SourceTelemetryInfo(
+ name=self.name,
+ type=SourceType.VENV,
+ version=self.reported_version,
+ )
class PathExecutor(Executor):
- def ensure_installation(self) -> None:
+ def __init__(
+ self,
+ name: str | None = None,
+ *,
+ path: Path,
+ target_version: str | None = None,
+ ) -> None:
+ """Initialize a connector executor that runs a connector from a local path.
+
+ If path is simply the name of the connector, it will be expected to exist in the current
+ PATH or in the current working directory.
+ """
+ self.path: Path = path
+ name = name or path.name
+ super().__init__(name=name, target_version=target_version)
+
+ def ensure_installation(
+ self,
+ *,
+ auto_fix: bool = True,
+ ) -> None:
+ """Ensure that the connector executable can be found.
+
+ The auto_fix parameter is ignored for this executor type.
+ """
+ _ = auto_fix
try:
self.execute(["spec"])
except Exception as e:
- raise exc.AirbyteConnectorNotFoundError(
- connector_name=self.metadata.name,
+ # TODO: Improve error handling. We should try to distinguish between
+ # a connector that is not installed and a connector that is not
+ # working properly.
+ raise exc.AirbyteConnectorExecutableNotFoundError(
+ connector_name=self.name,
) from e
def install(self) -> NoReturn:
raise exc.AirbyteConnectorInstallationError(
message="Connector cannot be installed because it is not managed by airbyte-lib.",
- connector_name=self.metadata.name,
+ connector_name=self.name,
)
def uninstall(self) -> NoReturn:
raise exc.AirbyteConnectorInstallationError(
message="Connector cannot be uninstalled because it is not managed by airbyte-lib.",
- connector_name=self.metadata.name,
+ connector_name=self.name,
)
def execute(self, args: list[str]) -> Iterator[str]:
- with _stream_from_subprocess([self.metadata.name, *args]) as stream:
+ with _stream_from_subprocess([str(self.path), *args]) as stream:
yield from stream
def get_telemetry_info(self) -> SourceTelemetryInfo:
- return SourceTelemetryInfo(self.metadata.name, SourceType.LOCAL_INSTALL, version=None)
+ return SourceTelemetryInfo(
+ str(self.name),
+ SourceType.LOCAL_INSTALL,
+ version=self.reported_version,
+ )
diff --git a/airbyte-lib/airbyte_lib/_factories/connector_factories.py b/airbyte-lib/airbyte_lib/_factories/connector_factories.py
index 4dbe8c6f41f0..197ed61142c6 100644
--- a/airbyte-lib/airbyte_lib/_factories/connector_factories.py
+++ b/airbyte-lib/airbyte_lib/_factories/connector_factories.py
@@ -1,11 +1,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations
+import shutil
+from pathlib import Path
from typing import Any
-from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor
-from airbyte_lib.exceptions import AirbyteLibInputError
-from airbyte_lib.registry import get_connector_metadata
+from airbyte_lib import exceptions as exc
+from airbyte_lib._executor import PathExecutor, VenvExecutor
+from airbyte_lib.registry import ConnectorMetadata, get_connector_metadata
from airbyte_lib.source import Source
@@ -15,7 +17,7 @@ def get_connector(
pip_url: str | None = None,
config: dict[str, Any] | None = None,
*,
- use_local_install: bool = False,
+ local_executable: Path | str | None = None,
install_if_missing: bool = True,
) -> Source:
"""Get a connector by name and version.
@@ -29,34 +31,58 @@ def get_connector(
connector name.
config: connector config - if not provided, you need to set it later via the set_config
method.
- use_local_install: whether to use a virtual environment to run the connector. If True, the
- connector is expected to be available on the path (e.g. installed via pip). If False,
- the connector will be installed automatically in a virtual environment.
- install_if_missing: whether to install the connector if it is not available locally. This
- parameter is ignored if use_local_install is True.
+ local_executable: If set, the connector will be assumed to already be installed and will be
+ executed using this path or executable name. Otherwise, the connector will be installed
+ automatically in a virtual environment.
+ install_if_missing: Whether to install the connector if it is not available locally. This
+ parameter is ignored when local_executable is set.
"""
- metadata = get_connector_metadata(name)
- if use_local_install:
+ if local_executable:
if pip_url:
- raise AirbyteLibInputError(
- message="Param 'pip_url' is not supported when 'use_local_install' is True."
+ raise exc.AirbyteLibInputError(
+ message="Param 'pip_url' is not supported when 'local_executable' is set."
)
if version:
- raise AirbyteLibInputError(
- message="Param 'version' is not supported when 'use_local_install' is True."
+ raise exc.AirbyteLibInputError(
+ message="Param 'version' is not supported when 'local_executable' is set."
)
- executor: Executor = PathExecutor(
- metadata=metadata,
- target_version=version,
- )
- else:
- executor = VenvExecutor(
- metadata=metadata,
- target_version=version,
- install_if_missing=install_if_missing,
- pip_url=pip_url,
+ if isinstance(local_executable, str):
+ if "/" in local_executable or "\\" in local_executable:
+ # Assume this is a path
+ local_executable = Path(local_executable).absolute()
+ else:
+ which_executable = shutil.which(local_executable)
+ if which_executable is None:
+ raise FileNotFoundError(local_executable)
+ local_executable = Path(which_executable).absolute()
+
+ return Source(
+ name=name,
+ config=config,
+ executor=PathExecutor(
+ name=name,
+ path=local_executable,
+ ),
)
+
+ metadata: ConnectorMetadata | None = None
+ try:
+ metadata = get_connector_metadata(name)
+ except exc.AirbyteConnectorNotRegisteredError:
+ if not pip_url:
+ # We don't have a pip url or registry entry, so we can't install the connector
+ raise
+
+ executor = VenvExecutor(
+ name=name,
+ metadata=metadata,
+ target_version=version,
+ pip_url=pip_url,
+ )
+ if install_if_missing:
+ executor.ensure_installation()
+
return Source(
executor=executor,
name=name,
diff --git a/airbyte-lib/airbyte_lib/exceptions.py b/airbyte-lib/airbyte_lib/exceptions.py
index 3c6336d03103..934e936d4d48 100644
--- a/airbyte-lib/airbyte_lib/exceptions.py
+++ b/airbyte-lib/airbyte_lib/exceptions.py
@@ -174,6 +174,14 @@ class AirbyteConnectorRegistryError(AirbyteError):
"""Error when accessing the connector registry."""
+@dataclass
+class AirbyteConnectorNotRegisteredError(AirbyteConnectorRegistryError):
+ """Connector not found in registry."""
+
+ connector_name: str | None = None
+ guidance = "Please double check the connector name."
+
+
# Connector Errors
@@ -184,8 +192,8 @@ class AirbyteConnectorError(AirbyteError):
connector_name: str | None = None
-class AirbyteConnectorNotFoundError(AirbyteConnectorError):
- """Connector not found."""
+class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError):
+ """Connector executable not found."""
class AirbyteConnectorInstallationError(AirbyteConnectorError):
diff --git a/airbyte-lib/airbyte_lib/registry.py b/airbyte-lib/airbyte_lib/registry.py
index bd030a867ff0..a7faf64eb919 100644
--- a/airbyte-lib/airbyte_lib/registry.py
+++ b/airbyte-lib/airbyte_lib/registry.py
@@ -3,6 +3,7 @@
import json
import os
+from copy import copy
from dataclasses import dataclass
from pathlib import Path
@@ -12,32 +13,60 @@
from airbyte_lib.version import get_version
+__cache: dict[str, ConnectorMetadata] | None = None
+
+
+REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
+REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
+
+
@dataclass
class ConnectorMetadata:
name: str
latest_available_version: str
-_cache: dict[str, ConnectorMetadata] | None = None
+def _get_registry_url() -> str:
+ if REGISTRY_ENV_VAR in os.environ:
+ return str(os.environ.get(REGISTRY_ENV_VAR))
-REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
+ return REGISTRY_URL
-def _update_cache() -> None:
- global _cache
- if os.environ.get("AIRBYTE_LOCAL_REGISTRY"):
- with Path(str(os.environ.get("AIRBYTE_LOCAL_REGISTRY"))).open() as f:
- data = json.load(f)
- else:
+def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
+ """Return the registry cache."""
+ global __cache
+ if __cache and not force_refresh:
+ return __cache
+
+ registry_url = _get_registry_url()
+ if registry_url.startswith("http"):
response = requests.get(
- REGISTRY_URL, headers={"User-Agent": f"airbyte-lib-{get_version()}"}
+ registry_url, headers={"User-Agent": f"airbyte-lib-{get_version()}"}
)
response.raise_for_status()
data = response.json()
- _cache = {}
+ else:
+ # Assume local file
+ with Path(registry_url).open() as f:
+ data = json.load(f)
+
+ new_cache: dict[str, ConnectorMetadata] = {}
+
for connector in data["sources"]:
name = connector["dockerRepository"].replace("airbyte/", "")
- _cache[name] = ConnectorMetadata(name, connector["dockerImageTag"])
+ new_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"])
+
+ if len(new_cache) == 0:
+ raise exc.AirbyteLibInternalError(
+ message="Connector registry is empty.",
+ context={
+ "registry_url": _get_registry_url(),
+ },
+ )
+
+ __cache = new_cache
+ return __cache
def get_connector_metadata(name: str) -> ConnectorMetadata:
@@ -45,14 +74,20 @@ def get_connector_metadata(name: str) -> ConnectorMetadata:
If the cache is empty, populate by calling update_cache.
"""
- if not _cache:
- _update_cache()
- if not _cache or name not in _cache:
- raise exc.AirbyteLibInputError(
- message="Connector name not found in registry.",
- guidance="Please double check the connector name.",
+ cache = copy(_get_registry_cache())
+ if not cache:
+ raise exc.AirbyteLibInternalError(
+ message="Connector registry could not be loaded.",
+ context={
+ "registry_url": _get_registry_url(),
+ },
+ )
+ if name not in cache:
+ raise exc.AirbyteConnectorNotRegisteredError(
+ connector_name=name,
context={
- "connector_name": name,
+ "registry_url": _get_registry_url(),
+ "available_connectors": sorted(cache.keys()),
},
)
- return _cache[name]
+ return cache[name]
diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py
index ec37e11791dd..4db25b3afae0 100644
--- a/airbyte-lib/airbyte_lib/source.py
+++ b/airbyte-lib/airbyte_lib/source.py
@@ -7,6 +7,7 @@
from typing import TYPE_CHECKING, Any
import jsonschema
+import yaml
from airbyte_protocol.models import (
AirbyteCatalog,
@@ -68,7 +69,13 @@ def __init__(
name: str,
config: dict[str, Any] | None = None,
streams: list[str] | None = None,
+ *,
+ validate: bool = False,
) -> None:
+ """Initialize the source.
+
+ If config is provided, it will be validated against the spec if validate is True.
+ """
self._processed_records = 0
self.executor = executor
self.name = name
@@ -79,7 +86,7 @@ def __init__(
self._spec: ConnectorSpecification | None = None
self._selected_stream_names: list[str] | None = None
if config is not None:
- self.set_config(config)
+ self.set_config(config, validate=validate)
if streams is not None:
self.set_streams(streams)
@@ -102,8 +109,22 @@ def set_streams(self, streams: list[str]) -> None:
)
self._selected_stream_names = streams
- def set_config(self, config: dict[str, Any]) -> None:
- self._validate_config(config)
+ def set_config(
+ self,
+ config: dict[str, Any],
+ *,
+ validate: bool = False,
+ ) -> None:
+ """Set the config for the connector.
+
+ If validate is True, raise an exception if the config fails validation.
+
+ If validate is False, validation will be deferred until check() or validate_config()
+ is called.
+ """
+ if validate:
+ self.validate_config(config)
+
self._config_dict = config
@property
@@ -131,9 +152,13 @@ def _discover(self) -> AirbyteCatalog:
log_text=self._last_log_messages,
)
- def _validate_config(self, config: dict[str, Any]) -> None:
- """Validate the config against the spec."""
+ def validate_config(self, config: dict[str, Any] | None = None) -> None:
+ """Validate the config against the spec.
+
+ If config is not provided, the already-set config will be validated.
+ """
spec = self._get_spec(force_refresh=False)
+ config = self._config if config is None else config
jsonschema.validate(config, spec.connectionSpecification)
def get_available_streams(self) -> list[str]:
@@ -161,6 +186,21 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
log_text=self._last_log_messages,
)
+ @property
+ def _yaml_spec(self) -> str:
+ """Get the spec as a yaml string.
+
+ For now, the primary use case is for writing and debugging a valid config for a source.
+
+ This is private for now because we probably want better polish before exposing this
+ as a stable interface. This will also get easier when we have docs links with this info
+ for each connector.
+ """
+ spec_obj: ConnectorSpecification = self._get_spec()
+ spec_dict = spec_obj.dict(exclude_unset=True)
+ # convert to a yaml string
+ return yaml.dump(spec_dict)
+
@property
def discovered_catalog(self) -> AirbyteCatalog:
"""Get the raw catalog for the given streams.
@@ -248,17 +288,23 @@ def check(self) -> None:
* Make sure the subprocess is killed when the function returns.
"""
with as_temp_files([self._config]) as [config_file]:
- for msg in self._execute(["check", "--config", config_file]):
- if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
- if msg.connectionStatus.status != Status.FAILED:
- return # Success!
-
- raise exc.AirbyteConnectorCheckFailedError(
- context={
- "message": msg.connectionStatus.message,
- }
- )
- raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
+ try:
+ for msg in self._execute(["check", "--config", config_file]):
+ if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
+ if msg.connectionStatus.status != Status.FAILED:
+ return # Success!
+
+ raise exc.AirbyteConnectorCheckFailedError(
+ context={
+ "message": msg.connectionStatus.message,
+ }
+ )
+ raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
+ except exc.AirbyteConnectorReadError as ex:
+ raise exc.AirbyteConnectorCheckFailedError(
+ message="The connector failed to check the connection.",
+ log_text=ex.log_text,
+ ) from ex
def install(self) -> None:
"""Install the connector if it is not yet installed."""
@@ -338,7 +384,8 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]:
* Read the output line by line of the subprocess and serialize them AirbyteMessage objects.
Drop if not valid.
"""
- self.executor.ensure_installation()
+ # Fail early if the connector is not installed.
+ self.executor.ensure_installation(auto_fix=False)
try:
self._last_log_messages = []
diff --git a/airbyte-lib/airbyte_lib/validate.py b/airbyte-lib/airbyte_lib/validate.py
index 75eab7e3fd39..551960ed5250 100644
--- a/airbyte-lib/airbyte_lib/validate.py
+++ b/airbyte-lib/airbyte_lib/validate.py
@@ -42,11 +42,16 @@ def _parse_args() -> argparse.Namespace:
def _run_subprocess_and_raise_on_failure(args: list[str]) -> None:
- result = subprocess.run(args, check=False)
+ result = subprocess.run(
+ args,
+ check=False,
+ stderr=subprocess.PIPE,
+ )
if result.returncode != 0:
raise exc.AirbyteSubprocessFailedError(
run_args=args,
exit_code=result.returncode,
+ log_text=result.stderr.decode("utf-8"),
)
@@ -55,7 +60,8 @@ def full_tests(connector_name: str, sample_config: str) -> None:
source = ab.get_connector(
# TODO: FIXME: noqa: SIM115, PTH123
connector_name,
- config=json.load(open(sample_config)), # noqa: SIM115, PTH123
+ config=json.load(open(sample_config)), # noqa: SIM115, PTH123,
+ install_if_missing=False,
)
print("Running check...")
diff --git a/airbyte-lib/docs/generated/airbyte_lib.html b/airbyte-lib/docs/generated/airbyte_lib.html
index c8d9f47128ea..ba7c11e54e3d 100644
--- a/airbyte-lib/docs/generated/airbyte_lib.html
+++ b/airbyte-lib/docs/generated/airbyte_lib.html
@@ -254,7 +254,7 @@ Inherited Members
def
-
get_connector( name: str, version: str | None = None, pip_url: str | None = None, config: dict[str, typing.Any] | None = None, *, use_local_install: bool = False, install_if_missing: bool = True) -> Source:
+
get_connector( name: str, version: str | None = None, pip_url: str | None = None, config: dict[str, typing.Any] | None = None, *, local_executable: pathlib.Path | str | None = None, install_if_missing: bool = True) -> Source:
@@ -271,11 +271,11 @@ Inherited Members
connector name.
config: connector config - if not provided, you need to set it later via the set_config
method.
- use_local_install: whether to use a virtual environment to run the connector. If True, the
- connector is expected to be available on the path (e.g. installed via pip). If False,
- the connector will be installed automatically in a virtual environment.
- install_if_missing: whether to install the connector if it is not available locally. This
- parameter is ignored if use_local_install is True.
+ local_executable: If set, the connector will be assumed to already be installed and will be
+ executed using this path or executable name. Otherwise, the connector will be installed
+ automatically in a virtual environment.
+ install_if_missing: Whether to install the connector if it is not available locally. This
+ parameter is ignored when local_executable is set.
@@ -409,13 +409,17 @@ Inherited Members
- Source( executor: airbyte_lib._executor.Executor, name: str, config: dict[str, typing.Any] | None = None, streams: list[str] | None = None)
+ Source( executor: airbyte_lib._executor.Executor, name: str, config: dict[str, typing.Any] | None = None, streams: list[str] | None = None, *, validate: bool = False)
-
+
Initialize the source.
+
+
If config is provided, it will be validated against the spec if validate is True.
+
+
@@ -465,13 +469,37 @@
Inherited Members
def
- set_config(self, config: dict[str, typing.Any]) -> None:
+ set_config(self, config: dict[str, typing.Any], *, validate: bool = False) -> None:
+
Set the config for the connector.
+
+
If validate is True, raise an exception if the config fails validation.
+
+
If validate is False, validation will be deferred until check() or validate_config()
+is called.
+
+
+
+
+
+
+
+ def
+ validate_config(self, config: dict[str, typing.Any] | None = None) -> None:
+
+
+
+
+
Validate the config against the spec.
+
+
If config is not provided, the already-set config will be validated.
+
+
diff --git a/airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/__init__.py b/airbyte-lib/tests/integration_tests/fixtures/source-test/source_test/__init__.py
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/airbyte-lib/tests/integration_tests/test_install.py b/airbyte-lib/tests/integration_tests/test_install.py
new file mode 100644
index 000000000000..c93801489f37
--- /dev/null
+++ b/airbyte-lib/tests/integration_tests/test_install.py
@@ -0,0 +1,23 @@
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
+
+from gettext import install
+import pytest
+
+from airbyte_lib._factories.connector_factories import get_connector
+from airbyte_lib import exceptions as exc
+
+
+def test_install_failure_log_pypi():
+ """Test that the install log is created and contains the expected content."""
+ with pytest.raises(exc.AirbyteConnectorNotRegisteredError):
+ source = get_connector("source-not-found")
+
+ with pytest.raises(exc.AirbyteConnectorInstallationError) as exc_info:
+ source = get_connector(
+ "source-not-found",
+ pip_url="https://pypi.org/project/airbyte-not-found",
+ install_if_missing=True,
+ )
+
+ # Check that the stderr log contains the expected content from a failed pip install
+ assert 'Could not install requirement' in str(exc_info.value.__cause__.log_text)
diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py
index 5dcd1f54e649..5d5586197577 100644
--- a/airbyte-lib/tests/integration_tests/test_integration.py
+++ b/airbyte-lib/tests/integration_tests/test_integration.py
@@ -3,6 +3,7 @@
from collections.abc import Mapping
import os
import shutil
+import subprocess
from typing import Any
from unittest.mock import Mock, call, patch
import tempfile
@@ -17,7 +18,7 @@
import pytest
from airbyte_lib.caches import PostgresCache, PostgresCacheConfig
-from airbyte_lib.registry import _update_cache
+from airbyte_lib import registry
from airbyte_lib.version import get_version
from airbyte_lib.results import ReadResult
from airbyte_lib.datasets import CachedDataset, LazyDataset, SQLDataset
@@ -28,24 +29,32 @@
import ulid
-@pytest.fixture(scope="module", autouse=True)
+LOCAL_TEST_REGISTRY_URL = "./tests/integration_tests/fixtures/registry.json"
+
+
+@pytest.fixture(scope="package", autouse=True)
def prepare_test_env():
"""
Prepare test environment. This will pre-install the test source from the fixtures array and set the environment variable to use the local json file as registry.
"""
- if os.path.exists(".venv-source-test"):
- shutil.rmtree(".venv-source-test")
+ venv_dir = ".venv-source-test"
+ if os.path.exists(venv_dir):
+ shutil.rmtree(venv_dir)
- os.system("python -m venv .venv-source-test")
- os.system(".venv-source-test/bin/pip install -e ./tests/integration_tests/fixtures/source-test")
+ subprocess.run(["python", "-m", "venv", venv_dir], check=True)
+ subprocess.run([f"{venv_dir}/bin/pip", "install", "-e", "./tests/integration_tests/fixtures/source-test"], check=True)
- os.environ["AIRBYTE_LOCAL_REGISTRY"] = "./tests/integration_tests/fixtures/registry.json"
+ os.environ["AIRBYTE_LOCAL_REGISTRY"] = LOCAL_TEST_REGISTRY_URL
os.environ["DO_NOT_TRACK"] = "true"
+ # Force-refresh the registry cache
+ _ = registry._get_registry_cache(force_refresh=True)
+
yield
shutil.rmtree(".venv-source-test")
+
@pytest.fixture
def expected_test_stream_data() -> dict[str, list[dict[str, str | int]]]:
return {
@@ -58,15 +67,50 @@ def expected_test_stream_data() -> dict[str, list[dict[str, str | int]]]:
],
}
-def test_list_streams(expected_test_stream_data: dict[str, list[dict[str, str | int]]]):
- source = ab.get_connector("source-test", config={"apiKey": "test"})
+def test_registry_get():
+ assert registry._get_registry_url() == LOCAL_TEST_REGISTRY_URL
+ metadata = registry.get_connector_metadata("source-test")
+ assert metadata.name == "source-test"
+ assert metadata.latest_available_version == "0.0.1"
+
+
+def test_list_streams(expected_test_stream_data: dict[str, list[dict[str, str | int]]]):
+ source = ab.get_connector(
+ "source-test", config={"apiKey": "test"}, install_if_missing=False
+ )
assert source.get_available_streams() == list(expected_test_stream_data.keys())
def test_invalid_config():
- with pytest.raises(Exception):
- ab.get_connector("source-test", config={"apiKey": 1234})
+ source = ab.get_connector(
+ "source-test", config={"apiKey": 1234}, install_if_missing=False
+ )
+ with pytest.raises(exc.AirbyteConnectorCheckFailedError):
+ source.check()
+
+
+def test_ensure_installation_detection():
+ """Assert that install isn't called, since the connector is already installed by the fixture."""
+ with patch("airbyte_lib._executor.VenvExecutor.install") as mock_venv_install, \
+ patch("airbyte_lib.source.Source.install") as mock_source_install, \
+ patch("airbyte_lib._executor.VenvExecutor.ensure_installation") as mock_ensure_installed:
+ source = ab.get_connector(
+ "source-test",
+ config={"apiKey": 1234},
+ pip_url="https://pypi.org/project/airbyte-not-found",
+ install_if_missing=True,
+ )
+ assert mock_ensure_installed.call_count == 1
+ assert not mock_venv_install.called
+ assert not mock_source_install.called
+
+
+def test_source_yaml_spec():
+ source = ab.get_connector(
+ "source-test", config={"apiKey": 1234}, install_if_missing=False
+ )
+ assert source._yaml_spec.startswith("connectionSpecification:\n $schema:")
def test_non_existing_connector():
@@ -93,22 +137,35 @@ def test_version_enforcement(raises, latest_available_version, requested_version
In this test, the actually installed version is 0.0.1
"""
- _update_cache()
- from airbyte_lib.registry import _cache
- _cache["source-test"].latest_available_version = latest_available_version
- if raises:
- with pytest.raises(Exception):
- ab.get_connector("source-test", version=requested_version, config={"apiKey": "abc"})
- else:
- ab.get_connector("source-test", version=requested_version, config={"apiKey": "abc"})
-
- # reset
- _cache["source-test"].latest_available_version = "0.0.1"
+ patched_entry = registry.ConnectorMetadata(
+ name="source-test", latest_available_version=latest_available_version
+ )
+ with patch.dict("airbyte_lib.registry.__cache", {"source-test": patched_entry}, clear=False):
+ if raises:
+ with pytest.raises(Exception):
+ source = ab.get_connector(
+ "source-test",
+ version=requested_version,
+ config={"apiKey": "abc"},
+ install_if_missing=False,
+ )
+ source.executor.ensure_installation(auto_fix=False)
+ else:
+ source = ab.get_connector(
+ "source-test",
+ version=requested_version,
+ config={"apiKey": "abc"},
+ install_if_missing=False,
+ )
+ source.executor.ensure_installation(auto_fix=False)
def test_check():
- source = ab.get_connector("source-test", config={"apiKey": "test"})
-
+ source = ab.get_connector(
+ "source-test",
+ config={"apiKey": "test"},
+ install_if_missing=False,
+ )
source.check()
@@ -141,7 +198,7 @@ def assert_cache_data(expected_test_stream_data: dict[str, list[dict[str, str |
pd.DataFrame(expected_test_stream_data[stream_name]),
check_dtype=False,
)
-
+
# validate that the cache doesn't contain any other streams
if streams:
assert len(list(cache.__iter__())) == len(streams)
@@ -459,6 +516,15 @@ def test_sync_with_merge_to_postgres(new_pg_cache_config: PostgresCacheConfig, e
check_dtype=False,
)
+
+def test_airbyte_lib_version() -> None:
+ assert get_version()
+ assert isinstance(get_version(), str)
+
+ # Ensure the version is a valid semantic version (x.y.z or x.y.z.alpha0)
+ assert 3 <= len(get_version().split(".")) <= 4
+
+
@patch.dict('os.environ', {'DO_NOT_TRACK': ''})
@patch('airbyte_lib.telemetry.requests')
@patch('airbyte_lib.telemetry.datetime')
@@ -601,27 +667,48 @@ def test_failing_path_connector():
ab.get_connector("source-test", config={"apiKey": "test"}, use_local_install=True)
def test_succeeding_path_connector():
- old_path = os.environ["PATH"]
+ new_path = f"{os.path.abspath('.venv-source-test/bin')}:{os.environ['PATH']}"
+
+ # Patch the PATH env var to include the test venv bin folder
+ with patch.dict(os.environ, {"PATH": new_path}):
+ source = ab.get_connector(
+ "source-test",
+ config={"apiKey": "test"},
+ local_executable="source-test",
+ )
+ source.check()
- # set path to include the test venv bin folder
- os.environ["PATH"] = f"{os.path.abspath('.venv-source-test/bin')}:{os.environ['PATH']}"
- source = ab.get_connector("source-test", config={"apiKey": "test"}, use_local_install=True)
- source.check()
+def test_install_uninstall():
+ with tempfile.TemporaryDirectory() as temp_dir:
+ source = ab.get_connector(
+ "source-test",
+ pip_url="./tests/integration_tests/fixtures/source-test",
+ config={"apiKey": "test"},
+ install_if_missing=False,
+ )
- os.environ["PATH"] = old_path
+ # Override the install root to avoid conflicts with the test fixture
+ install_root = Path(temp_dir)
+ source.executor.install_root = install_root
-def test_install_uninstall():
- source = ab.get_connector("source-test", pip_url="./tests/integration_tests/fixtures/source-test", config={"apiKey": "test"}, install_if_missing=False)
+ # assert that the venv is gone
+ assert not os.path.exists(install_root / ".venv-source-test")
- source.uninstall()
+ # use which to check if the executable is available
+ assert shutil.which("source-test") is None
- # assert that the venv is gone
- assert not os.path.exists(".venv-source-test")
+ # assert that the connector is not available
+ with pytest.raises(Exception):
+ source.check()
+
+ source.install()
+
+ assert os.path.exists(install_root / ".venv-source-test")
+ assert os.path.exists(install_root / ".venv-source-test/bin/source-test")
- # assert that the connector is not available
- with pytest.raises(Exception):
source.check()
- source.install()
+ source.uninstall()
- source.check()
+ assert not os.path.exists(install_root / ".venv-source-test")
+ assert not os.path.exists(install_root / ".venv-source-test/bin/source-test")