Skip to content

Commit

Permalink
Feat: Add Cloud Interop and Robust Secrets Management (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Apr 10, 2024
1 parent ad19764 commit 3a7ba19
Show file tree
Hide file tree
Showing 66 changed files with 3,729 additions and 504 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/python_pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ jobs:
# Job-specific step(s):
- name: Run Pytest (No-Creds)
env:
# Force this to an invalid value to ensure tests that no creds are required are run.
GCP_GSM_CREDENTIALS: "no-creds"
run: poetry run pytest -m "not requires_creds"
# Force this to a blank value.
GCP_GSM_CREDENTIALS: ""
run: >
poetry run pytest -m
"not requires_creds and not linting and not super_slow"
pytest:
name: Pytest (All, Python ${{ matrix.python-version }}, ${{ matrix.os }})
Expand Down Expand Up @@ -114,4 +116,5 @@ jobs:
- name: Run Pytest
env:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
run: poetry run pytest -m "not linting"
run: >
poetry run pytest -m "not linting and not super_slow"
2 changes: 1 addition & 1 deletion .github/workflows/test-pr-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
- name: Run Pytest
env:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
run: poetry run pytest
run: poetry run pytest -m "not super_slow"

log-success-comment:
name: Append 'Success' Comment
Expand Down
10 changes: 8 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Directories and subdirectories called 'secrets' or '.secrets'
# Packaged docs
docs/*.zip

# Misc
.DS_Store

# Directories and subdirectories called '.secrets' and the top-level '/secrets' directory
.secrets
secrets
/secrets

# Virtual Environments
.venv
Expand Down
35 changes: 27 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# PyAirbyte

PyAirbyte brings the power of Airbyte to every Python developer. PyAirbyte provides a set of utilities to use Airbyte connectors in Python. It is meant to be used in situations where setting up an Airbyte server or cloud account is not possible or desirable.
PyAirbyte brings the power of Airbyte to every Python developer. PyAirbyte provides a set of utilities to use Airbyte connectors in Python.

[![PyPI version](https://badge.fury.io/py/airbyte.svg)](https://badge.fury.io/py/airbyte)
[![PyPI - Downloads](https://img.shields.io/pypi/dm/airbyte)](https://pypi.org/project/airbyte/)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/airbyte)](https://pypi.org/project/airbyte/)
<!-- [![PyPI - License](https://img.shields.io/pypi/l/airbyte)](https://pypi.org/project/airbyte/) -->
[![PyPI - Wheel](https://img.shields.io/pypi/wheel/airbyte)](https://pypi.org/project/airbyte/)
<!-- [![PyPI - Status](https://img.shields.io/pypi/status/airbyte)](https://pypi.org/project/airbyte/) -->
[![PyPI - Implementation](https://img.shields.io/pypi/implementation/airbyte)](https://pypi.org/project/airbyte/)
[![PyPI - Format](https://img.shields.io/pypi/format/airbyte)](https://pypi.org/project/airbyte/)
[![Star on GitHub](https://img.shields.io/github/stars/airbytehq/pyairbyte.svg?style=social&label=★%20on%20GitHub)](https://github.com/airbytehq/pyairbyte)

- [Getting Started](#getting-started)
- [Secrets Management](#secrets-management)
Expand Down Expand Up @@ -29,24 +39,34 @@ PyAirbyte can auto-import secrets from the following sources:
3. [Google Colab secrets](https://medium.com/@parthdasawant/how-to-use-secrets-in-google-colab-450c38e3ec75).
4. Manual entry via [`getpass`](https://docs.python.org/3.9/library/getpass.html).

_Note: Additional secret store options may be supported in the future. [More info here.](https://github.com/airbytehq/airbyte-lib-private-beta/discussions/5)_
_Note: You can also build your own secret manager by subclassing the `CustomSecretManager` implementation. For more information, see the `airbyte.secrets.CustomSecretManager` class definiton._

### Retrieving Secrets

```python
from airbyte import get_secret, SecretSource
import airbyte as ab

source = get_source("source-github")
source = ab.get_source("source-github")
source.set_config(
"credentials": {
"personal_access_token": get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"),
"personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"),
}
)
```

The `get_secret()` function accepts an optional `source` argument of enum type `SecretSource`. If omitted or set to `SecretSource.ANY`, PyAirbyte will search all available secrets sources. If `source` is set to a specific source, then only that source will be checked. If a list of `SecretSource` entries is passed, then the sources will be checked using the provided ordering.
By default, PyAirbyte will search all available secrets sources. The `get_secret()` function also accepts an optional `sources` argument of specific source names (`SecretSourceEnum`) and/or secret manager objects to check.

By default, PyAirbyte will prompt the user for any requested secrets that are not provided via other secret managers. You can disable this prompt by passing `prompt=False` to `get_secret()`.
By default, PyAirbyte will prompt the user for any requested secrets that are not provided via other secret managers. You can disable this prompt by passing `allow_prompt=False` to `get_secret()`.

For more information, see the `airbyte.secrets` module.

### Secrets Auto-Discovery

If you have a secret matching an expected name, PyAirbyte will automatically use it. For example, if you have a secret named `GITHUB_PERSONAL_ACCESS_TOKEN`, PyAirbyte will automatically use it when configuring the GitHub source.

The naming convention for secrets is as `{CONNECTOR_NAME}_{PROPERTY_NAME}`, for instance `SNOWFLAKE_PASSWORD` and `BIGQUERY_CREDENTIALS_PATH`.

PyAirbyte will also auto-discover secrets for interop with hosted Airbyte: `AIRBYTE_CLOUD_API_URL`, `AIRBYTE_CLOUD_API_KEY`, etc.

## Connector compatibility

Expand Down Expand Up @@ -120,7 +140,6 @@ Yes. Just pick the cache type matching the destination - like SnowflakeCache for
**6. Can PyAirbyte import a connector from a local directory that has python project files, or does it have to be pip install**
Yes, PyAirbyte can use any local install that has a CLI - and will automatically find connectors by name if they are on PATH.


## Changelog and Release Notes

For a version history and list of all changes, please see our [GitHub Releases](https://github.com/airbytehq/PyAirbyte/releases) page.
8 changes: 5 additions & 3 deletions airbyte/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""PyAirbyte brings Airbyte ELT to every Python developer.
.. include:: ../README.md
Expand All @@ -7,14 +8,14 @@
"""
from __future__ import annotations

from airbyte import caches, datasets, documents, exceptions, results, secrets, sources
from airbyte import caches, cloud, datasets, documents, exceptions, results, secrets, sources
from airbyte.caches.bigquery import BigQueryCache
from airbyte.caches.duckdb import DuckDBCache
from airbyte.caches.util import get_default_cache, new_local_cache
from airbyte.datasets import CachedDataset
from airbyte.records import StreamRecord
from airbyte.results import ReadResult
from airbyte.secrets import SecretSource, get_secret
from airbyte.secrets import SecretSourceEnum, get_secret
from airbyte.sources import registry
from airbyte.sources.base import Source
from airbyte.sources.registry import get_available_connectors
Expand All @@ -23,6 +24,7 @@

__all__ = [
# Modules
"cloud",
"caches",
"datasets",
"documents",
Expand All @@ -43,7 +45,7 @@
"CachedDataset",
"DuckDBCache",
"ReadResult",
"SecretSource",
"SecretSourceEnum",
"Source",
"StreamRecord",
]
Expand Down
4 changes: 2 additions & 2 deletions airbyte/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
The 'name' param is required if 'metadata' is None.
"""
if not name and not metadata:
raise exc.AirbyteLibInternalError(message="Either name or metadata must be provided.")
raise exc.PyAirbyteInternalError(message="Either name or metadata must be provided.")

self.name: str = name or cast(ConnectorMetadata, metadata).name # metadata is not None here
self.metadata: ConnectorMetadata | None = metadata
Expand Down Expand Up @@ -270,7 +270,7 @@ def _get_installed_version(
if not self.interpreter_path.exists():
# No point in trying to detect the version if the interpreter does not exist
if raise_on_error:
raise exc.AirbyteLibInternalError(
raise exc.PyAirbyteInternalError(
message="Connector's virtual environment interpreter could not be found.",
context={
"interpreter_path": self.interpreter_path,
Expand Down
8 changes: 4 additions & 4 deletions airbyte/_processors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
self._expected_streams: set[str] | None = None
self.cache: CacheBase = cache
if not isinstance(self.cache, CacheBase):
raise exc.AirbyteLibInputError(
raise exc.PyAirbyteInputError(
message=(
f"Expected config class of type 'CacheBase'. "
f"Instead received type '{type(self.cache).__name__}'."
Expand Down Expand Up @@ -92,7 +92,7 @@ def register_source(
) -> None:
"""Register the source name and catalog."""
if not self._catalog_manager:
raise exc.AirbyteLibInternalError(
raise exc.PyAirbyteInternalError(
message="Catalog manager should exist but does not.",
)
self._catalog_manager.register_source(
Expand Down Expand Up @@ -226,7 +226,7 @@ def _finalize_state_messages(
) -> None:
"""Handle state messages by passing them to the catalog manager."""
if not self._catalog_manager:
raise exc.AirbyteLibInternalError(
raise exc.PyAirbyteInternalError(
message="Catalog manager should exist but does not.",
)
if state_messages and self._source_name:
Expand All @@ -251,7 +251,7 @@ def _get_stream_config(
) -> ConfiguredAirbyteStream:
"""Return the definition of the given stream."""
if not self._catalog_manager:
raise exc.AirbyteLibInternalError(
raise exc.PyAirbyteInternalError(
message="Catalog manager should exist but does not.",
)

Expand Down
2 changes: 1 addition & 1 deletion airbyte/_processors/file/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def process_record_message(
batch_handle = self._new_batch(stream_name=stream_name)

if batch_handle.open_file_writer is None:
raise exc.AirbyteLibInternalError(message="Expected open file writer.")
raise exc.PyAirbyteInternalError(message="Expected open file writer.")

self._write_record_dict(
record_dict=StreamRecord.from_record_message(
Expand Down
16 changes: 8 additions & 8 deletions airbyte/_processors/sql/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def _get_table_by_name(
query. To ignore the cache and force a refresh, set 'force_refresh' to True.
"""
if force_refresh and shallow_okay:
raise exc.AirbyteLibInternalError(
raise exc.PyAirbyteInternalError(
message="Cannot force refresh and use shallow query at the same time."
)

Expand Down Expand Up @@ -453,7 +453,7 @@ def _ensure_compatible_table_schema(
]
if missing_columns:
if raise_on_error:
raise exc.AirbyteLibCacheTableValidationError(
raise exc.PyAirbyteCacheTableValidationError(
violation="Cache table is missing expected columns.",
context={
"stream_column_names": stream_column_names,
Expand Down Expand Up @@ -666,7 +666,7 @@ def _write_files_to_new_table(

# Pandas will auto-create the table if it doesn't exist, which we don't want.
if not self._table_exists(temp_table_name):
raise exc.AirbyteLibInternalError(
raise exc.PyAirbyteInternalError(
message="Table does not exist after creation.",
context={
"temp_table_name": temp_table_name,
Expand Down Expand Up @@ -727,7 +727,7 @@ def _write_temp_table_to_final_table(
has_pks: bool = bool(self._get_primary_keys(stream_name))
has_incremental_key: bool = bool(self._get_incremental_key(stream_name))
if write_strategy == WriteStrategy.MERGE and not has_pks:
raise exc.AirbyteLibInputError(
raise exc.PyAirbyteInputError(
message="Cannot use merge strategy on a stream with no primary keys.",
context={
"stream_name": stream_name,
Expand Down Expand Up @@ -783,7 +783,7 @@ def _write_temp_table_to_final_table(
)
return

raise exc.AirbyteLibInternalError(
raise exc.PyAirbyteInternalError(
message="Write strategy is not supported.",
context={
"write_strategy": write_strategy,
Expand Down Expand Up @@ -843,9 +843,9 @@ def _swap_temp_table_with_final_table(
Databases that do not support this syntax can override this method.
"""
if final_table_name is None:
raise exc.AirbyteLibInternalError(message="Arg 'final_table_name' cannot be None.")
raise exc.PyAirbyteInternalError(message="Arg 'final_table_name' cannot be None.")
if temp_table_name is None:
raise exc.AirbyteLibInternalError(message="Arg 'temp_table_name' cannot be None.")
raise exc.PyAirbyteInternalError(message="Arg 'temp_table_name' cannot be None.")

_ = stream_name
deletion_name = f"{final_table_name}_deleteme"
Expand Down Expand Up @@ -909,7 +909,7 @@ def _get_column_by_name(self, table: str | Table, column_name: str) -> Column:
# Try to get the column in a case-insensitive manner
return next(col for col in table.c if col.name.lower() == column_name.lower())
except StopIteration:
raise exc.AirbyteLibInternalError(
raise exc.PyAirbyteInternalError(
message="Could not find matching column.",
context={
"table": table,
Expand Down
6 changes: 3 additions & 3 deletions airbyte/_processors/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _table_exists(
return False

except ValueError as ex:
raise exc.AirbyteLibInputError(
raise exc.PyAirbyteInputError(
message="Invalid project name or dataset name.",
context={
"table_id": table_id,
Expand Down Expand Up @@ -225,9 +225,9 @@ def _swap_temp_table_with_final_table(
ALTER TABLE my_schema.my_old_table_name RENAME TO my_new_table_name;
"""
if final_table_name is None:
raise exc.AirbyteLibInternalError(message="Arg 'final_table_name' cannot be None.")
raise exc.PyAirbyteInternalError(message="Arg 'final_table_name' cannot be None.")
if temp_table_name is None:
raise exc.AirbyteLibInternalError(message="Arg 'temp_table_name' cannot be None.")
raise exc.PyAirbyteInternalError(message="Arg 'temp_table_name' cannot be None.")

_ = stream_name
deletion_name = f"{final_table_name}_deleteme"
Expand Down
21 changes: 21 additions & 0 deletions airbyte/_util/api_duck_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""A set of duck-typed classes for working with the Airbyte API."""

from __future__ import annotations

from typing import TYPE_CHECKING, Protocol


if TYPE_CHECKING:
import requests


class AirbyteApiResponseDuckType(Protocol):
"""Used for duck-typing various Airbyte API responses."""

content_type: str
r"""HTTP response content type for this operation"""
status_code: int
r"""HTTP response status code for this operation"""
raw_response: requests.Response
r"""Raw HTTP response; suitable for custom response parsing"""
Loading

0 comments on commit 3a7ba19

Please sign in to comment.