From 3b6b82f73371fd3d9ad3ecbfb2c0c0f0c2879b0c Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Tue, 9 Jan 2024 18:07:07 +0000 Subject: [PATCH 01/17] Destination Astra --- .../connectors/destination-astra/README.md | 159 ++++++++ .../acceptance-test-config.yml | 5 + .../destination_astra/__init__.py | 8 + .../destination_astra/astra_client.py | 150 ++++++++ .../destination_astra/config.py | 48 +++ .../destination_astra/destination.py | 56 +++ .../destination_astra/indexer.py | 89 +++++ .../destination_astra/spec.json | 357 ++++++++++++++++++ .../connectors/destination-astra/icon.svg | 46 +++ .../integration_tests/integration_test.py | 54 +++ .../connectors/destination-astra/main.py | 11 + .../destination-astra/metadata.yaml | 30 ++ .../destination-astra/requirements.txt | 1 + .../connectors/destination-astra/setup.py | 28 ++ .../unit_tests/destination_test.py | 98 +++++ .../unit_tests/indexer_test.py | 170 +++++++++ .../destination-astra/unit_tests/unit_test.py | 7 + docs/integrations/destinations/astra.md | 64 ++++ 18 files changed, 1381 insertions(+) create mode 100644 airbyte-integrations/connectors/destination-astra/README.md create mode 100644 airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/destination-astra/destination_astra/__init__.py create mode 100644 airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py create mode 100644 airbyte-integrations/connectors/destination-astra/destination_astra/config.py create mode 100644 airbyte-integrations/connectors/destination-astra/destination_astra/destination.py create mode 100644 airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py create mode 100644 airbyte-integrations/connectors/destination-astra/destination_astra/spec.json create mode 100644 airbyte-integrations/connectors/destination-astra/icon.svg create mode 100644 airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py create mode 100644 airbyte-integrations/connectors/destination-astra/main.py create mode 100644 airbyte-integrations/connectors/destination-astra/metadata.yaml create mode 100644 airbyte-integrations/connectors/destination-astra/requirements.txt create mode 100644 airbyte-integrations/connectors/destination-astra/setup.py create mode 100644 airbyte-integrations/connectors/destination-astra/unit_tests/destination_test.py create mode 100644 airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py create mode 100644 airbyte-integrations/connectors/destination-astra/unit_tests/unit_test.py create mode 100644 docs/integrations/destinations/astra.md diff --git a/airbyte-integrations/connectors/destination-astra/README.md b/airbyte-integrations/connectors/destination-astra/README.md new file mode 100644 index 000000000000..2fa995b22593 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/README.md @@ -0,0 +1,159 @@ +# Astra Destination + +This is the repository for the Astra destination connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/destinations/astra). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.9.0` + +#### Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/destinations/astra) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_astra/spec.json` file. +Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination astra test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Use `airbyte-ci` to build your connector +The Airbyte way of building this connector is to use our `airbyte-ci` tool. +You can follow install instructions [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#L1). +Then running the following command will build your connector: + +```bash +airbyte-ci connectors --name destination-astra build +``` +Once the command is done, you will find your connector image in your local docker registry: `airbyte/destination-astra:dev`. + +##### Customizing our build process +When contributing on our connector you might need to customize the build process to add a system dependency or set an env var. +You can customize our build process by adding a `build_customization.py` module to your connector. +This module should contain a `pre_connector_install` and `post_connector_install` async function that will mutate the base image and the connector container respectively. +It will be imported at runtime by our build process and the functions will be called if they exist. + +Here is an example of a `build_customization.py` module: +```python +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + # Feel free to check the dagger documentation for more information on the Container object and its methods. + # https://dagger-io.readthedocs.io/en/sdk-python-v0.6.4/ + from dagger import Container + + +async def pre_connector_install(base_image_container: Container) -> Container: + return await base_image_container.with_env_variable("MY_PRE_BUILD_ENV_VAR", "my_pre_build_env_var_value") + +async def post_connector_install(connector_container: Container) -> Container: + return await connector_container.with_env_variable("MY_POST_BUILD_ENV_VAR", "my_post_build_env_var_value") +``` + +#### Build your own connector image +This connector is built using our dynamic built process in `airbyte-ci`. +The base image used to build it is defined within the metadata.yaml file under the `connectorBuildOptions`. +The build logic is defined using [Dagger](https://dagger.io/) [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/pipelines/builds/python_connectors.py). +It does not rely on a Dockerfile. + +If you would like to patch our connector and build your own a simple approach would be to: + +1. Create your own Dockerfile based on the latest version of the connector image. +```Dockerfile +FROM airbyte/destination-astra:latest + +COPY . ./airbyte/integration_code +RUN pip install ./airbyte/integration_code + +# The entrypoint and default env vars are already set in the base image +# ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +# ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] +``` +Please use this as an example. This is not optimized. + +2. Build your image: +```bash +docker build -t airbyte/destination-astra:dev . +# Running the spec command against your patched connector +docker run airbyte/destination-astra:dev spec +```` +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/destination-astra:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-astra:dev check --config /secrets/config.json +# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages +cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-astra:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing + Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Coming soon: + +### Using `airbyte-ci` to run tests +See [airbyte-ci documentation](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#connectors-test-command) + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml b/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml new file mode 100644 index 000000000000..5c946edf2530 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml @@ -0,0 +1,5 @@ +acceptance_tests: + spec: + tests: + - spec_path: destination_astra/spec.json +connector_image: airbyte/destination-astra:dev \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/__init__.py b/airbyte-integrations/connectors/destination-astra/destination_astra/__init__.py new file mode 100644 index 000000000000..1f125a4276a5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from .destination import DestinationAstra + +__all__ = ["DestinationAstra"] diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py b/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py new file mode 100644 index 000000000000..eec4cc74bebf --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py @@ -0,0 +1,150 @@ +import requests +import json +from typing import Dict, List, Optional + + +class AstraClient: + def __init__( + self, + astra_id: str, + astra_region: str, + astra_application_token: str, + keyspace_name: str, + embedding_dim: int, + similarity_function: str, + ): + self.astra_id = astra_id + self.astra_application_token = astra_application_token + self.astra_region = astra_region + self.keyspace_name = keyspace_name + self.embedding_dim = embedding_dim + self.similarity_function = similarity_function + + self.request_base_url = f"https://{self.astra_id}-{self.astra_region}.apps.astra.datastax.com/api/json/v1/{self.keyspace_name}" + self.request_header = { + "x-cassandra-token": self.astra_application_token, + "Content-Type": "application/json", + } + + def _run_query(self, request_url: str, query: Dict): + try: + response = requests.request("POST", request_url, headers=self.request_header, data=json.dumps(query)) + if response.status_code == 200: + response_dict = json.loads(response.text) + if "errors" in response_dict: + raise Exception(f"Astra DB request error - {response_dict['errors']}") + else: + return response_dict + else: + raise Exception(f"Astra DB not available. Status code: {response.status_code}, {response.text}") + except Exception: + raise Exception + + def find_collections(self, include_detail: bool = True): + query = {"findCollections": {"options": {"explain": include_detail}}} + result = self._run_query(self.request_base_url, query) + + return result["status"]["collections"] + + def find_collection(self, collection_name: str): + collections = self.find_collections(False) + return collection_name in collections + + def create_collection(self, collection_name: str, embedding_dim: Optional[int] = None, similarity_function: Optional[str] = None): + query = { + "createCollection": { + "name": collection_name, + "options": { + "vector": { + "dimension": embedding_dim if embedding_dim is not None else self.embedding_dim, + "metric": similarity_function if similarity_function is not None else self.similarity_function, + } + }, + } + } + result = self._run_query(self.request_base_url, query) + + return True if result["status"]["ok"] == 1 else False + + def delete_collection(self, collection_name: str): + query = {"deleteCollection": {"name": collection_name}} + result = self._run_query(self.request_base_url, query) + + return True if result["status"]["ok"] == 1 else False + + def _build_collection_query(self, collection_name: str): + return f"{self.request_base_url}/{collection_name}" + + def find_documents( + self, + collection_name: str, + filter: Optional[Dict] = None, + vector: Optional[List[float]] = None, + limit: Optional[int] = None, + include_vector: Optional[bool] = None, + include_similarity: Optional[bool] = None, + ) -> List[Dict]: + find_query = {} + + if filter is not None: + find_query["filter"] = filter + + if vector is not None: + find_query["sort"] = {"$vector": vector} + + if include_vector is not None and include_vector == False: + find_query["projection"] = {"$vector": 0} + + if limit is not None: + find_query["options"] = {"limit": limit} + + if include_similarity is not None: + if "options" in find_query: + find_query["options"]["includeSimilarity"] = int(include_similarity) + else: + find_query["options"] = {"includeSimilarity": int(include_similarity)} + + query = {"find": find_query} + result = self._run_query(self._build_collection_query(collection_name), query) + return result["data"]["documents"] + + def insert_document(self, collection_name: str, document: Dict) -> str: + query = {"insertOne": {"document": document}} + result = self._run_query(self._build_collection_query(collection_name), query) + + return result["status"]["insertedIds"][0] + + def insert_documents(self, collection_name: str, documents: List[Dict]) -> List[str]: + query = {"insertMany": {"documents": documents}} + result = self._run_query(self._build_collection_query(collection_name), query) + + return result["status"]["insertedIds"] + + def update_document(self, collection_name: str, filter: Dict, update: Dict, upsert: bool = True) -> Dict: + query = {"findOneAndUpdate": {"filter": filter, "update": update, "options": {"returnDocument": "after", "upsert": upsert}}} + result = self._run_query(self._build_collection_query(collection_name), query) + + return result["status"] + + def update_documents(self, collection_name: str, filter: Dict, update: Dict): + query = { + "updateMany": { + "filter": filter, + "update": update, + } + } + result = self._run_query(self._build_collection_query(collection_name), query) + + return result["status"] + + def count_documents(self, collection_name: str): + query = {"countDocuments": {}} + result = self._run_query(self._build_collection_query(collection_name), query) + + return result["status"]["count"] + + def delete_documents(self, collection_name: str, filter: Dict) -> int: + query = {"deleteMany": {"filter": filter}} + result = self._run_query(self._build_collection_query(collection_name), query) + + return result["status"]["deletedCount"] diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py new file mode 100644 index 000000000000..4011e5ea1de9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py @@ -0,0 +1,48 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.destinations.vector_db_based.config import VectorDBConfigModel +from pydantic import BaseModel, Field + + +class AstraIndexingModel(BaseModel): + astra_db_app_token: str = Field( + ..., + title="AstraDB Application Token", + airbyte_secret=True, + description="AstraDB Application Token", + ) + astra_db_id: str = Field( + ..., + title="AstraDB Id", + airbyte_secret=True, + description="AstraDB Id", + ) + astra_db_region: str = Field( + ..., + title="AstraDB Region", + description="AstraDB Region", + examples=["us-east1"], + ) + astra_db_keyspace: str = Field( + ..., + title="AstraDB Keyspace", + description="Astra DB Keyspace" + ) + collection: str = Field( + ..., + title="AstraDB collection", + description="AstraDB collection" + ) + + class Config: + title = "Indexing" + schema_extra = { + "description": "Astra DB gives developers the APIs, real-time data and ecosystem integrations to put accurate RAG and Gen AI apps with fewer hallucinations in production.", + "group": "indexing", + } + + +class ConfigModel(VectorDBConfigModel): + indexing: AstraIndexingModel diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py b/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py new file mode 100644 index 000000000000..6fa1bd7ade5b --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py @@ -0,0 +1,56 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from typing import Any, Iterable, Mapping + +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.destinations import Destination +from airbyte_cdk.destinations.vector_db_based.document_processor import DocumentProcessor +from airbyte_cdk.destinations.vector_db_based.embedder import Embedder, create_from_config +from airbyte_cdk.destinations.vector_db_based.indexer import Indexer +from airbyte_cdk.destinations.vector_db_based.writer import Writer +from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Status +from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode +from destination_astra.config import ConfigModel +from destination_astra.indexer import AstraIndexer + +BATCH_SIZE = 100 + + +class DestinationAstra(Destination): + indexer: Indexer + embedder: Embedder + + def _init_indexer(self, config: ConfigModel): + self.embedder = create_from_config(config.embedding, config.processing) + self.indexer = AstraIndexer(config.indexing, self.embedder.embedding_dimensions) + + def write( + self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage] + ) -> Iterable[AirbyteMessage]: + config_model = ConfigModel.parse_obj(config) + self._init_indexer(config_model) + writer = Writer( + config_model.processing, self.indexer, self.embedder, batch_size=BATCH_SIZE, omit_raw_text=config_model.omit_raw_text + ) + yield from writer.write(configured_catalog, input_messages) + + def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + parsed_config = ConfigModel.parse_obj(config) + self._init_indexer(parsed_config) + checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)] + errors = [error for error in checks if error is not None] + if len(errors) > 0: + return AirbyteConnectionStatus(status=Status.FAILED, message="\n".join(errors)) + else: + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + + def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification: + return ConnectorSpecification( + documentationUrl="https://docs.airbyte.com/integrations/destinations/astra", + supportsIncremental=True, + supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append, DestinationSyncMode.append_dedup], + connectionSpecification=ConfigModel.schema(), # type: ignore[attr-defined] + ) diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py new file mode 100644 index 000000000000..40f9ead57544 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py @@ -0,0 +1,89 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import uuid +from typing import Optional + +import urllib3 +from airbyte_cdk.destinations.vector_db_based.document_processor import METADATA_RECORD_ID_FIELD, METADATA_STREAM_FIELD +from airbyte_cdk.destinations.vector_db_based.indexer import Indexer +from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, create_stream_identifier, format_exception +from airbyte_cdk.models.airbyte_protocol import ConfiguredAirbyteCatalog, DestinationSyncMode +from destination_astra.config import AstraIndexingModel +from destination_astra.astra_client import AstraClient + + +# do not flood the server with too many connections in parallel +PARALLELISM_LIMIT = 20 + +MAX_METADATA_SIZE = 40_960 - 10_000 + +MAX_IDS_PER_DELETE = 1000 + + +class AstraIndexer(Indexer): + config: AstraIndexingModel + + def __init__(self, config: AstraIndexingModel, embedding_dimensions: int): + super().__init__(config) + + self.client = AstraClient( + config.astra_db_id, config.astra_db_region, config.astra_db_app_token, config.astra_db_keyspace, embedding_dimensions, "cosine" + ) + + self.embedding_dimensions = embedding_dimensions + + def _create_collection(self): + if self.client.find_collection(self.config.collection) is False: + self.client.create_collection(self.config.collection) + + def pre_sync(self, catalog: ConfiguredAirbyteCatalog): + self._create_collection() + for stream in catalog.streams: + if stream.destination_sync_mode == DestinationSyncMode.overwrite: + self.client.delete_documents( + collection_name=self.config.collection, filter={METADATA_STREAM_FIELD: create_stream_identifier(stream.stream)} + ) + + def index(self, document_chunks, namespace, stream): + docs = [] + for i in range(len(document_chunks)): + chunk = document_chunks[i] + metadata = chunk.metadata + if chunk.page_content is not None: + metadata["text"] = chunk.page_content + doc = { + "_id": str(uuid.uuid4()), + "$vector": chunk.embedding, + **metadata, + } + docs.append(doc) + serial_batches = create_chunks(docs, batch_size=PARALLELISM_LIMIT) + + for batch in serial_batches: + results = [chunk for chunk in batch] + self.client.insert_documents(collection_name=self.config.collection, documents=results) + + def delete(self, delete_ids, namespace, stream): + if len(delete_ids) > 0: + self.client.delete_documents(collection_name=self.config.collection, filter={METADATA_RECORD_ID_FIELD: {"$in": delete_ids}}) + + def check(self) -> Optional[str]: + try: + collections = self.client.find_collections() + collection = next(filter(lambda f: f["name"] == self.config.collection, collections), None) + if collection is None: + return f"{self.config.collection} collection does not exist." + + actual_dimension = collection["options"]["vector"]["dimension"] + if actual_dimension != self.embedding_dimensions: + return f"Your embedding configuration will produce vectors with dimension {self.embedding_dimensions:d}, but your collection is configured with dimension {actual_dimension:d}. Make sure embedding and indexing configurations match." + except Exception as e: + if isinstance(e, urllib3.exceptions.MaxRetryError): + if "Failed to resolve 'apps.astra.datastax.com'" in str(e.reason): + return "Failed to resolve environment, please check whether the credential is correct." + + formatted_exception = format_exception(e) + return formatted_exception + return None diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/spec.json b/airbyte-integrations/connectors/destination-astra/destination_astra/spec.json new file mode 100644 index 000000000000..a6c9e0308ead --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/spec.json @@ -0,0 +1,357 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/destinations/astra", + "supported_destination_sync_modes": ["overwrite", "append", "append_dedup"], + "supportsIncremental": true, + "connectionSpecification": { + "title": "Destination Config", + "description": "The configuration model for the Vector DB based destinations. This model is used to generate the UI for the destination configuration,\nas well as to provide type safety for the configuration passed to the destination.\n\nThe configuration model is composed of four parts:\n* Processing configuration\n* Embedding configuration\n* Indexing configuration\n* Advanced configuration\n\nProcessing, embedding and advanced configuration are provided by this base class, while the indexing configuration is provided by the destination connector in the sub class.", + "type": "object", + "required": ["embedding", "processing", "indexing"], + "properties": { + "indexing": { + "title": "Indexing", + "type": "object", + "properties": { + "astra_db_app_token": { + "title": "AstraDB Application Token", + "description": "AstraDB Application Token", + "airbyte_secret": true, + "type": "string" + }, + "astra_db_id": { + "title": "AstraDB Id", + "description": "AstraDB Id", + "airbyte_secret": true, + "type": "string" + }, + "astra_db_region": { + "title": "AstraDB Region", + "description": "AstraDB Region", + "examples": ["us-east1"], + "type": "string" + }, + "astra_db_keyspace": { + "title": "AstraDB Keyspace", + "description": "Astra DB Keyspace", + "type": "string" + }, + "collection": { + "title": "AstraDB collection", + "description": "AstraDB collection", + "type": "string" + } + }, + "required": ["astra_db_app_token", "astra_db_id", "astra_db_region", "astra_db_keyspace", "collection"], + "description": "Astra DB gives developers the APIs, real-time data and ecosystem integrations to put accurate RAG and Gen AI apps with fewer hallucinations in production.", + "group": "indexing" + }, + "embedding": { + "title": "Embedding", + "description": "Embedding configuration", + "group": "embedding", + "type": "object", + "oneOf": [ + { + "title": "OpenAI", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "openai", + "const": "openai", + "enum": ["openai"], + "type": "string" + }, + "openai_key": { + "title": "OpenAI API key", + "airbyte_secret": true, + "type": "string" + } + }, + "required": ["openai_key", "mode"], + "description": "Use the OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions." + }, + { + "title": "Cohere", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "cohere", + "const": "cohere", + "enum": ["cohere"], + "type": "string" + }, + "cohere_key": { + "title": "Cohere API key", + "airbyte_secret": true, + "type": "string" + } + }, + "required": ["cohere_key", "mode"], + "description": "Use the Cohere API to embed text." + }, + { + "title": "Fake", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "fake", + "const": "fake", + "enum": ["fake"], + "type": "string" + } + }, + "required": ["mode"], + "description": "Use a fake embedding made out of random vectors with 1536 embedding dimensions. This is useful for testing the data pipeline without incurring any costs." + }, + { + "title": "Azure OpenAI", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "azure_openai", + "const": "azure_openai", + "enum": ["azure_openai"], + "type": "string" + }, + "openai_key": { + "title": "Azure OpenAI API key", + "description": "The API key for your Azure OpenAI resource. You can find this in the Azure portal under your Azure OpenAI resource", + "airbyte_secret": true, + "type": "string" + }, + "api_base": { + "title": "Resource base URL", + "description": "The base URL for your Azure OpenAI resource. You can find this in the Azure portal under your Azure OpenAI resource", + "examples": ["https://your-resource-name.openai.azure.com"], + "type": "string" + }, + "deployment": { + "title": "Deployment", + "description": "The deployment for your Azure OpenAI resource. You can find this in the Azure portal under your Azure OpenAI resource", + "examples": ["your-resource-name"], + "type": "string" + } + }, + "required": ["openai_key", "api_base", "deployment", "mode"], + "description": "Use the Azure-hosted OpenAI API to embed text. This option is using the text-embedding-ada-002 model with 1536 embedding dimensions." + }, + { + "title": "OpenAI-compatible", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "openai_compatible", + "const": "openai_compatible", + "enum": ["openai_compatible"], + "type": "string" + }, + "api_key": { + "title": "API key", + "default": "", + "airbyte_secret": true, + "type": "string" + }, + "base_url": { + "title": "Base URL", + "description": "The base URL for your OpenAI-compatible service", + "examples": ["https://your-service-name.com"], + "type": "string" + }, + "model_name": { + "title": "Model name", + "description": "The name of the model to use for embedding", + "default": "text-embedding-ada-002", + "examples": ["text-embedding-ada-002"], + "type": "string" + }, + "dimensions": { + "title": "Embedding dimensions", + "description": "The number of dimensions the embedding model is generating", + "examples": [1536, 384], + "type": "integer" + } + }, + "required": ["base_url", "dimensions", "mode"], + "description": "Use a service that's compatible with the OpenAI API to embed text." + } + ] + }, + "processing": { + "title": "ProcessingConfigModel", + "type": "object", + "properties": { + "chunk_size": { + "title": "Chunk size", + "description": "Size of chunks in tokens to store in vector store (make sure it is not too big for the context if your LLM)", + "minimum": 1, + "maximum": 8191, + "type": "integer" + }, + "chunk_overlap": { + "title": "Chunk overlap", + "description": "Size of overlap between chunks in tokens to store in vector store to better capture relevant context", + "default": 0, + "type": "integer" + }, + "text_fields": { + "title": "Text fields to embed", + "description": "List of fields in the record that should be used to calculate the embedding. The field list is applied to all streams in the same way and non-existing fields are ignored. If none are defined, all fields are considered text fields. When specifying text fields, you can access nested fields in the record by using dot notation, e.g. `user.name` will access the `name` field in the `user` object. It's also possible to use wildcards to access all fields in an object, e.g. `users.*.name` will access all `names` fields in all entries of the `users` array.", + "default": [], + "always_show": true, + "examples": ["text", "user.name", "users.*.name"], + "type": "array", + "items": { "type": "string" } + }, + "metadata_fields": { + "title": "Fields to store as metadata", + "description": "List of fields in the record that should be stored as metadata. The field list is applied to all streams in the same way and non-existing fields are ignored. If none are defined, all fields are considered metadata fields. When specifying text fields, you can access nested fields in the record by using dot notation, e.g. `user.name` will access the `name` field in the `user` object. It's also possible to use wildcards to access all fields in an object, e.g. `users.*.name` will access all `names` fields in all entries of the `users` array. When specifying nested paths, all matching values are flattened into an array set to a field named by the path.", + "default": [], + "always_show": true, + "examples": ["age", "user", "user.name"], + "type": "array", + "items": { "type": "string" } + }, + "field_name_mappings": { + "title": "Field name mappings", + "description": "List of fields to rename. Not applicable for nested fields, but can be used to rename fields already flattened via dot notation.", + "default": [], + "type": "array", + "items": { + "title": "FieldNameMappingConfigModel", + "type": "object", + "properties": { + "from_field": { + "title": "From field name", + "description": "The field name in the source", + "type": "string" + }, + "to_field": { + "title": "To field name", + "description": "The field name to use in the destination", + "type": "string" + } + }, + "required": ["from_field", "to_field"] + } + }, + "text_splitter": { + "title": "Text splitter", + "description": "Split text fields into chunks based on the specified method.", + "type": "object", + "oneOf": [ + { + "title": "By Separator", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "separator", + "const": "separator", + "enum": ["separator"], + "type": "string" + }, + "separators": { + "title": "Separators", + "description": "List of separator strings to split text fields by. The separator itself needs to be wrapped in double quotes, e.g. to split by the dot character, use \".\". To split by a newline, use \"\\n\".", + "default": ["\"\\n\\n\"", "\"\\n\"", "\" \"", "\"\""], + "type": "array", + "items": { "type": "string" } + }, + "keep_separator": { + "title": "Keep separator", + "description": "Whether to keep the separator in the resulting chunks", + "default": false, + "type": "boolean" + } + }, + "required": ["mode"], + "description": "Split the text by the list of separators until the chunk size is reached, using the earlier mentioned separators where possible. This is useful for splitting text fields by paragraphs, sentences, words, etc." + }, + { + "title": "By Markdown header", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "markdown", + "const": "markdown", + "enum": ["markdown"], + "type": "string" + }, + "split_level": { + "title": "Split level", + "description": "Level of markdown headers to split text fields by. Headings down to the specified level will be used as split points", + "default": 1, + "minimum": 1, + "maximum": 6, + "type": "integer" + } + }, + "required": ["mode"], + "description": "Split the text by Markdown headers down to the specified header level. If the chunk size fits multiple sections, they will be combined into a single chunk." + }, + { + "title": "By Programming Language", + "type": "object", + "properties": { + "mode": { + "title": "Mode", + "default": "code", + "const": "code", + "enum": ["code"], + "type": "string" + }, + "language": { + "title": "Language", + "description": "Split code in suitable places based on the programming language", + "enum": [ + "cpp", + "go", + "java", + "js", + "php", + "proto", + "python", + "rst", + "ruby", + "rust", + "scala", + "swift", + "markdown", + "latex", + "html", + "sol" + ], + "type": "string" + } + }, + "required": ["language", "mode"], + "description": "Split the text by suitable delimiters based on the programming language. This is useful for splitting code into chunks." + } + ] + } + }, + "required": ["chunk_size"], + "group": "processing" + }, + "omit_raw_text": { + "title": "Do not store raw text", + "description": "Do not store the text that gets embedded along with the vector and the metadata in the destination. If set to true, only the vector and the metadata will be stored - in this case raw text for LLM use cases needs to be retrieved from another source.", + "default": false, + "group": "advanced", + "type": "boolean" + } + }, + "groups": [ + { "id": "processing", "title": "Processing" }, + { "id": "embedding", "title": "Embedding" }, + { "id": "indexing", "title": "Indexing" }, + { "id": "advanced", "title": "Advanced" } + ] + } +} diff --git a/airbyte-integrations/connectors/destination-astra/icon.svg b/airbyte-integrations/connectors/destination-astra/icon.svg new file mode 100644 index 000000000000..2d1f6c918ed9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/icon.svg @@ -0,0 +1,46 @@ + + + + + + + + + + diff --git a/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py b/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py new file mode 100644 index 000000000000..77179bd19645 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import logging + +from airbyte_cdk.destinations.vector_db_based.test_utils import BaseIntegrationTest +from airbyte_cdk.models import DestinationSyncMode, Status +from destination_astra.destination import DestinationAstra + +from destination_astra.astra_client import AstraClient +from destination_astra.config import ConfigModel +from airbyte_cdk.destinations.vector_db_based.embedder import create_from_config + + +class AstraIntegrationTest(BaseIntegrationTest): + + def test_check_valid_config(self): + outcome = DestinationAstra().check(logging.getLogger("airbyte"), self.config) + assert outcome.status == Status.SUCCEEDED + + def test_check_invalid_config(self): + invalid_config = self.config + + invalid_config["embedding"]["openai_key"] = 123 + + outcome = DestinationAstra().check( + logging.getLogger("airbyte"), invalid_config) + assert outcome.status == Status.FAILED + + def test_write(self): + db_config = ConfigModel.parse_obj(self.config) + embedder = create_from_config(db_config.embedding, db_config.processing) + db_creds = db_config.indexing + astra_client = AstraClient( + db_creds.astra_db_id, + db_creds.astra_db_region, + db_creds.astra_db_app_token, + db_creds.astra_db_keyspace, + embedder.embedding_dimensions, + "cosine" + ) + + astra_client.delete_documents(collection_name=db_creds.collection, filter={}) + assert astra_client.count_documents(db_creds.collection) == 0 + + catalog = self._get_configured_catalog(DestinationSyncMode.overwrite) + + message1 = self._record("mystream", "text data 1", 1) + message2 = self._record("mystream", "text data 2", 2) + + outcome = list(DestinationAstra().write(self.config, catalog, [message1, message2])) + assert astra_client.count_documents(db_creds.collection) == 2 + diff --git a/airbyte-integrations/connectors/destination-astra/main.py b/airbyte-integrations/connectors/destination-astra/main.py new file mode 100644 index 000000000000..53b96b2b39ec --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/main.py @@ -0,0 +1,11 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +import sys + +from destination_astra import DestinationAstra + +if __name__ == "__main__": + DestinationAstra().run(sys.argv[1:]) diff --git a/airbyte-integrations/connectors/destination-astra/metadata.yaml b/airbyte-integrations/connectors/destination-astra/metadata.yaml new file mode 100644 index 000000000000..9d14a8789d77 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/metadata.yaml @@ -0,0 +1,30 @@ +data: + allowedHosts: + hosts: + - "*.apps.astra.datastax.com" + registries: + oss: + enabled: false + cloud: + enabled: false + connectorBuildOptions: + # Please update to the latest version of the connector base image. + # Please use the full address with sha256 hash to guarantee build reproducibility. + # https://hub.docker.com/r/airbyte/python-connector-base + baseImage: docker.io/airbyte/python-connector-base:1.0.0@sha256:dd17e347fbda94f7c3abff539be298a65af2d7fc27a307d89297df1081a45c27 + connectorSubtype: database + connectorType: destination + definitionId: 042ce96f-1158-4662-9543-e2ff015be97a + dockerImageTag: 0.1.0 + dockerRepository: airbyte/destination-astra + githubIssueLabel: destination-astra + icon: astra.svg + license: MIT + name: Astra + releaseDate: 2024-01-10 + releaseStage: alpha + supportLevel: community + documentationUrl: https://docs.airbyte.com/integrations/destinations/astra + tags: + - language:python +metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-astra/requirements.txt b/airbyte-integrations/connectors/destination-astra/requirements.txt new file mode 100644 index 000000000000..d6e1198b1ab1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/requirements.txt @@ -0,0 +1 @@ +-e . diff --git a/airbyte-integrations/connectors/destination-astra/setup.py b/airbyte-integrations/connectors/destination-astra/setup.py new file mode 100644 index 000000000000..bad83d6bde73 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/setup.py @@ -0,0 +1,28 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk==0.55.1", + "langchain==0.0.271", + "openai==0.27.9", + "tiktoken==0.4.0", +] + +TEST_REQUIREMENTS = ["pytest~=6.2"] + +setup( + name="destination_astra", + description="Destination implementation for Astra.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/destination-astra/unit_tests/destination_test.py b/airbyte-integrations/connectors/destination-astra/unit_tests/destination_test.py new file mode 100644 index 000000000000..f98f09f3b713 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/unit_tests/destination_test.py @@ -0,0 +1,98 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import unittest +from unittest.mock import MagicMock, Mock, patch + +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.models import ConnectorSpecification, Status +from destination_astra.config import ConfigModel +from destination_astra.destination import DestinationAstra + + +class TestDestinationAstra(unittest.TestCase): + def setUp(self): + self.config = { + "processing": {"text_fields": ["str_col"], "metadata_fields": [], "chunk_size": 1000}, + "embedding": {"mode": "openai", "openai_key": "mykey"}, + "indexing": { + "astra_db_app_token": "mytoken", + "astra_db_id": "myid", + "astra_db_region": "myregion", + "astra_db_keyspace": "mykeyspace", + "collection": "mycollection", + }, + } + self.config_model = ConfigModel.parse_obj(self.config) + self.logger = AirbyteLogger() + + @patch("destination_astra.destination.AstraIndexer") + @patch("destination_astra.destination.create_from_config") + def test_check(self, MockedEmbedder, MockedAstraIndexer): + mock_embedder = Mock() + mock_indexer = Mock() + MockedEmbedder.return_value = mock_embedder + MockedAstraIndexer.return_value = mock_indexer + + mock_embedder.check.return_value = None + mock_indexer.check.return_value = None + + destination = DestinationAstra() + result = destination.check(self.logger, self.config) + + self.assertEqual(result.status, Status.SUCCEEDED) + mock_embedder.check.assert_called_once() + mock_indexer.check.assert_called_once() + + @patch("destination_astra.destination.AstraIndexer") + @patch("destination_astra.destination.create_from_config") + def test_check_with_errors(self, MockedEmbedder, MockedAstraIndexer): + mock_embedder = Mock() + mock_indexer = Mock() + MockedEmbedder.return_value = mock_embedder + MockedAstraIndexer.return_value = mock_indexer + + embedder_error_message = "Embedder Error" + indexer_error_message = "Indexer Error" + + mock_embedder.check.return_value = embedder_error_message + mock_indexer.check.return_value = indexer_error_message + + destination = DestinationAstra() + result = destination.check(self.logger, self.config) + + self.assertEqual(result.status, Status.FAILED) + self.assertEqual(result.message, f"{embedder_error_message}\n{indexer_error_message}") + + mock_embedder.check.assert_called_once() + mock_indexer.check.assert_called_once() + + @patch("destination_astra.destination.Writer") + @patch("destination_astra.destination.AstraIndexer") + @patch("destination_astra.destination.create_from_config") + def test_write(self, MockedEmbedder, MockedAstraIndexer, MockedWriter): + mock_embedder = Mock() + mock_indexer = Mock() + MockedEmbedder.return_value = mock_embedder + mock_writer = Mock() + + MockedAstraIndexer.return_value = mock_indexer + MockedWriter.return_value = mock_writer + + mock_writer.write.return_value = [] + + configured_catalog = MagicMock() + input_messages = [] + + destination = DestinationAstra() + list(destination.write(self.config, configured_catalog, input_messages)) + + MockedWriter.assert_called_once_with(self.config_model.processing, mock_indexer, mock_embedder, batch_size=100, omit_raw_text=False) + mock_writer.write.assert_called_once_with(configured_catalog, input_messages) + + def test_spec(self): + destination = DestinationAstra() + result = destination.spec() + + self.assertIsInstance(result, ConnectorSpecification) diff --git a/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py b/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py new file mode 100644 index 000000000000..4c0b4fc0bfcc --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py @@ -0,0 +1,170 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +from unittest.mock import ANY, MagicMock, Mock, patch +import pytest +import urllib3 +from airbyte_cdk.models import ConfiguredAirbyteCatalog +from destination_astra.config import AstraIndexingModel +from destination_astra.indexer import AstraIndexer + + +def create_astra_indexer(): + config = AstraIndexingModel( + astra_db_app_token="mytoken", + astra_db_id="myid", + astra_db_region="myregion", + astra_db_keyspace="mykeyspace", + collection="mycollection", + ) + indexer = AstraIndexer(config, 3) + + indexer.client.delete_documents = MagicMock() + indexer.client.insert_documents = MagicMock() + indexer.client.find_documents = MagicMock() + + return indexer + + +def create_index_description(collection_name, dimensions): + return {"name": collection_name, "options": {"vector": {"dimension": dimensions, "metric": "cosine"}}} + + +def test_astra_index_upsert_and_delete(): + indexer = create_astra_indexer() + indexer.index( + [ + Mock(page_content="test", metadata={"_ab_stream": "abc"}, embedding=[1, 2, 3]), + Mock(page_content="test2", metadata={"_ab_stream": "abc"}, embedding=[4, 5, 6]), + ], + "ns1", + "some_stream", + ) + indexer.delete(["delete_id1", "delete_id2"], "ns1", "some_stram") + indexer.client.delete_documents.assert_called_with( + collection_name="mycollection", filter={"_ab_record_id": {"$in": ["delete_id1", "delete_id2"]}} + ) + indexer.client.insert_documents.assert_called_with( + collection_name="mycollection", + documents=[ + {"_id": ANY, "$vector": [1, 2, 3], "_ab_stream": "abc", "text": "test"}, + {"_id": ANY, "$vector": [4, 5, 6], "_ab_stream": "abc", "text": "test2"}, + ], + ) + + +def test_astra_index_empty_batch(): + indexer = create_astra_indexer() + indexer.index([], "ns1", "some_stream") + indexer.client.delete_documents.assert_not_called() + indexer.client.insert_documents.assert_not_called() + + +def test_astra_index_upsert_batching(): + indexer = create_astra_indexer() + indexer.index( + [Mock(page_content=f"test {i}", metadata={"_ab_stream": "abc"}, embedding=[i, i, i]) for i in range(50)], + "ns1", + "some_stream", + ) + assert indexer.client.insert_documents.call_count == 3 + for i in range(20): + assert indexer.client.insert_documents.call_args_list[0].kwargs.get("documents")[i] == { + "_id": ANY, + "$vector": [i, i, i], + "_ab_stream": "abc", + "text": f"test {i}", + } + for i in range(20, 40): + assert indexer.client.insert_documents.call_args_list[1].kwargs.get("documents")[i - 20] == { + "_id": ANY, + "$vector": [i, i, i], + "_ab_stream": "abc", + "text": f"test {i}", + } + for i in range(40, 50): + assert indexer.client.insert_documents.call_args_list[2].kwargs.get("documents")[i - 40] == { + "_id": ANY, + "$vector": [i, i, i], + "_ab_stream": "abc", + "text": f"test {i}", + } + + +def generate_catalog(): + return ConfiguredAirbyteCatalog.parse_obj( + { + "streams": [ + { + "stream": { + "name": "example_stream", + "json_schema": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": False, + "default_cursor_field": ["column_name"], + "namespace": "ns1", + }, + "primary_key": [["_id"]], + "sync_mode": "incremental", + "destination_sync_mode": "append_dedup", + }, + { + "stream": { + "name": "example_stream2", + "json_schema": {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": False, + "default_cursor_field": ["column_name"], + "namespace": "ns2", + }, + "primary_key": [["_id"]], + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, + ] + } + ) + + +def test_astra_pre_sync(): + indexer = create_astra_indexer() + indexer.client.find_collection = MagicMock(collection_name="") + indexer.client.find_collection.return_value = True + + indexer.pre_sync(generate_catalog()) + indexer.client.delete_documents.assert_called_with(collection_name="mycollection", filter={"_ab_stream": "ns2_example_stream2"}) + + +@pytest.mark.parametrize( + "collection_name,describe_throws,reported_dimensions,check_succeeds,error_message", + [ + ("mycollection", None, 3, True, None), + ("other_collection", None, 3, False, "mycollection collection does not exist."), + ( + ["mycollection"], + urllib3.exceptions.MaxRetryError(None, "", reason=Exception("Failed to resolve environment, please check whether the credential is correct.")), + 3, + False, + "Failed to resolve environment", + ), + ("mycollection", None, 4, False, "Make sure embedding and indexing configurations match."), + ("mycollection", Exception("describe failed"), 3, False, "describe failed"), + ("mycollection", Exception("describe failed"), 4, False, "describe failed"), + ], +) +def test_astra_check(collection_name, describe_throws, reported_dimensions, check_succeeds, error_message): + indexer = create_astra_indexer() + + indexer.client.find_collections = MagicMock() + indexer.client.find_collections.return_value = [create_index_description(collection_name=collection_name, dimensions=reported_dimensions)] + + if describe_throws: + indexer.client.find_collections.side_effect = describe_throws + else: + indexer.client.find_collections.return_value = [create_index_description(collection_name=collection_name, dimensions=reported_dimensions)] + + result = indexer.check() + if check_succeeds: + assert result is None + else: + assert error_message in result diff --git a/airbyte-integrations/connectors/destination-astra/unit_tests/unit_test.py b/airbyte-integrations/connectors/destination-astra/unit_tests/unit_test.py new file mode 100644 index 000000000000..219ae0142c72 --- /dev/null +++ b/airbyte-integrations/connectors/destination-astra/unit_tests/unit_test.py @@ -0,0 +1,7 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +def test_example_method(): + assert True diff --git a/docs/integrations/destinations/astra.md b/docs/integrations/destinations/astra.md new file mode 100644 index 000000000000..81241803b460 --- /dev/null +++ b/docs/integrations/destinations/astra.md @@ -0,0 +1,64 @@ +# Astra Destination + +This page contains the setup guide and reference information for the destination-astra connector. + +## Prerequisites + +#### Minimum Python version required `= 3.9.0` + +## Setup Guide + +#### Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/destinations/astra) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_astra/spec.json` file. +Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination astra test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Use `airbyte-ci` to build your connector +The Airbyte way of building this connector is to use our `airbyte-ci` tool. +You can follow install instructions [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#L1). +Then running the following command will build your connector: + +```bash +airbyte-ci connectors --name destination-astra build +``` +Once the command is done, you will find your connector image in your local docker registry: `airbyte/destination-astra:dev`. + +## Supported Sync Modes + +## Supported Streams + +## Changelog +| Version | Date | Pull Request | Subject | +| :------ | :--------- | :------------------------------------------------------- | :-------------------------- | +| 0.1.0 | 2024-01-08 | | Initial Release | From 06b8950222eb8e79012c627ec73ed038a9647252 Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Wed, 17 Jan 2024 19:12:42 +0000 Subject: [PATCH 02/17] Updating main airbyte requirement --- airbyte-integrations/connectors/destination-astra/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-astra/setup.py b/airbyte-integrations/connectors/destination-astra/setup.py index bad83d6bde73..a6b825fc41be 100644 --- a/airbyte-integrations/connectors/destination-astra/setup.py +++ b/airbyte-integrations/connectors/destination-astra/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk==0.55.1", + "airbyte-cdk[vector-db-based]==0.57.0", "langchain==0.0.271", "openai==0.27.9", "tiktoken==0.4.0", From f0d7c268a8cab4f52324e3d2b4458b6a13688099 Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Wed, 17 Jan 2024 19:28:33 +0000 Subject: [PATCH 03/17] Removing extraneous reqs --- airbyte-integrations/connectors/destination-astra/setup.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/destination-astra/setup.py b/airbyte-integrations/connectors/destination-astra/setup.py index a6b825fc41be..4dd255dbfd75 100644 --- a/airbyte-integrations/connectors/destination-astra/setup.py +++ b/airbyte-integrations/connectors/destination-astra/setup.py @@ -6,10 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk[vector-db-based]==0.57.0", - "langchain==0.0.271", - "openai==0.27.9", - "tiktoken==0.4.0", + "airbyte-cdk[vector-db-based]==0.57.0" ] TEST_REQUIREMENTS = ["pytest~=6.2"] From 0326741d137f296f9686ca08315e8a91ef8680a8 Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Wed, 17 Jan 2024 19:28:59 +0000 Subject: [PATCH 04/17] Add create collection to check --- .../destination-astra/destination_astra/destination.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py b/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py index 6fa1bd7ade5b..f8623f85668d 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py @@ -40,6 +40,7 @@ def write( def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: parsed_config = ConfigModel.parse_obj(config) self._init_indexer(parsed_config) + self.indexer._create_collection() checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)] errors = [error for error in checks if error is not None] if len(errors) > 0: From 86d3c6744cb2ca728201f18a939a3fcd3ba0b5a5 Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Wed, 17 Jan 2024 20:28:35 +0000 Subject: [PATCH 05/17] Switch from id and region to endpoint --- .../destination_astra/astra_client.py | 8 +++----- .../destination-astra/destination_astra/config.py | 13 ++++--------- .../destination-astra/destination_astra/indexer.py | 2 +- .../destination-astra/destination_astra/spec.json | 13 ++++--------- 4 files changed, 12 insertions(+), 24 deletions(-) diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py b/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py index eec4cc74bebf..2e167e749e6f 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py @@ -6,21 +6,19 @@ class AstraClient: def __init__( self, - astra_id: str, - astra_region: str, + astra_endpoint: str, astra_application_token: str, keyspace_name: str, embedding_dim: int, similarity_function: str, ): - self.astra_id = astra_id + self.astra_endpoint = astra_endpoint self.astra_application_token = astra_application_token - self.astra_region = astra_region self.keyspace_name = keyspace_name self.embedding_dim = embedding_dim self.similarity_function = similarity_function - self.request_base_url = f"https://{self.astra_id}-{self.astra_region}.apps.astra.datastax.com/api/json/v1/{self.keyspace_name}" + self.request_base_url = f"{self.astra_endpoint}/api/json/v1/{self.keyspace_name}" self.request_header = { "x-cassandra-token": self.astra_application_token, "Content-Type": "application/json", diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py index 4011e5ea1de9..4f76e841ba83 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py @@ -13,16 +13,11 @@ class AstraIndexingModel(BaseModel): airbyte_secret=True, description="AstraDB Application Token", ) - astra_db_id: str = Field( + astra_db_endpoint: str = Field( ..., - title="AstraDB Id", - airbyte_secret=True, - description="AstraDB Id", - ) - astra_db_region: str = Field( - ..., - title="AstraDB Region", - description="AstraDB Region", + title="AstraDB Endpoint", + description="AstraDB Endpoint", + pattern="^https:\/\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-(af|il|ap|ca|eu|me|sa|us|cn|us-gov|us-iso|us-isob)-(central|north|(north(?:east|west))|south|south(?:east|west)|east|west)([0-9]{1})\\.apps\\.astra\\.datastax\\.com", examples=["us-east1"], ) astra_db_keyspace: str = Field( diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py index 40f9ead57544..e036ee4550d0 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py @@ -29,7 +29,7 @@ def __init__(self, config: AstraIndexingModel, embedding_dimensions: int): super().__init__(config) self.client = AstraClient( - config.astra_db_id, config.astra_db_region, config.astra_db_app_token, config.astra_db_keyspace, embedding_dimensions, "cosine" + config.astra_db_endpoint, config.astra_db_app_token, config.astra_db_keyspace, embedding_dimensions, "cosine" ) self.embedding_dimensions = embedding_dimensions diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/spec.json b/airbyte-integrations/connectors/destination-astra/destination_astra/spec.json index a6c9e0308ead..5d5272f81f74 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/spec.json +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/spec.json @@ -18,18 +18,13 @@ "airbyte_secret": true, "type": "string" }, - "astra_db_id": { - "title": "AstraDB Id", - "description": "AstraDB Id", + "astra_db_endpoint": { + "title": "AstraDB Endpoint", + "description": "AstraDB Endpoint", + "pattern": "^https:\/\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-(af|il|ap|ca|eu|me|sa|us|cn|us-gov|us-iso|us-isob)-(central|north|(north(?:east|west))|south|south(?:east|west)|east|west)([0-9]{1})\\.apps\\.astra\\.datastax\\.com", "airbyte_secret": true, "type": "string" }, - "astra_db_region": { - "title": "AstraDB Region", - "description": "AstraDB Region", - "examples": ["us-east1"], - "type": "string" - }, "astra_db_keyspace": { "title": "AstraDB Keyspace", "description": "Astra DB Keyspace", From 6e9c52811495dc9449f2dcf48a56c6e391a7176e Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Wed, 17 Jan 2024 21:12:46 +0000 Subject: [PATCH 06/17] Updating Astra Destination Docs --- docs/integrations/destinations/astra.md | 82 ++++++++++--------------- 1 file changed, 33 insertions(+), 49 deletions(-) diff --git a/docs/integrations/destinations/astra.md b/docs/integrations/destinations/astra.md index 81241803b460..23530c852f70 100644 --- a/docs/integrations/destinations/astra.md +++ b/docs/integrations/destinations/astra.md @@ -1,62 +1,46 @@ # Astra Destination This page contains the setup guide and reference information for the destination-astra connector. +## Pre-Requisites -## Prerequisites - -#### Minimum Python version required `= 3.9.0` +- An OpenAI, AzureOpenAI, Cohere, etc. API Key ## Setup Guide -#### Activate Virtual Environment and install dependencies -From this connector directory, create a virtual environment: -``` -python -m venv .venv -``` - -This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your -development environment of choice. To activate it from the terminal, run: -``` -source .venv/bin/activate -pip install -r requirements.txt -``` -If you are in an IDE, follow your IDE's instructions to activate the virtualenv. - -Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is -used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. - -#### Create credentials -**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/destinations/astra) -to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_astra/spec.json` file. -Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information. -See `integration_tests/sample_config.json` for a sample config file. - -**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination astra test creds` -and place them into `secrets/config.json`. - -### Locally running the connector -``` -python main.py spec -python main.py check --config secrets/config.json -python main.py discover --config secrets/config.json -python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json -``` - -### Locally running the connector docker image - -#### Use `airbyte-ci` to build your connector -The Airbyte way of building this connector is to use our `airbyte-ci` tool. -You can follow install instructions [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#L1). -Then running the following command will build your connector: - -```bash -airbyte-ci connectors --name destination-astra build -``` -Once the command is done, you will find your connector image in your local docker registry: `airbyte/destination-astra:dev`. +#### Set Up an Astra Database + +- Create an Astra account [here](https://astra.datastax.com/signup) +- In the Astra Portal, select Databases in the main navigation. +- Click Create Database. +- In the Create Database dialog, select the Serverless (Vector) deployment type. +- In the Configuration section, enter a name for the new database in the Database name field. +-- Because database names can’t be changed later, it’s best to name your database something meaningful. Database names must start and end with an alphanumeric character, and may contain the following special characters: & + - _ ( ) < > . , @. +- Select your preferred Provider and Region. +-- You can select from a limited number of regions if you’re on the Free plan. Regions with a lock icon require that you upgrade to a Pay As You Go plan. +- Click Create Database. +-- You are redirected to your new database’s Overview screen. Your database starts in Pending status before transitioning to Initializing. You’ll receive a notification once your database is initialized. + +#### Setting up a Vector Collection + +- From the database Overview screen, click on the Data Explorer tab +- Either enter default_namespace into the Airbyte UI under astra_db_keyspace or open the namespace dropdown, create a new namespace, and enter that instead +- Click Create Collection +- Enter a name for the collection +-- Also enter this name into the Airbyte UI as collection +- Enter a vector length under Dimensions +-- This should match with the embedding model you plan to use. The default model for openai is text-embedding-ada-002 which produced vectors of length 1536. +- Select a similarity metric +-- Default is cosine + +#### Gathering other credentials + +- Go back to the Overview tab on the Astra UI +- Copy the Endpoint under Database Details and load into Airbyte under the name astra_db_endpoint +- Click generate token, copy the application token and load under astra_db_app_token ## Supported Sync Modes -## Supported Streams +Full Refresh Sync ## Changelog | Version | Date | Pull Request | Subject | From 5f52ca7b92e56f2bc7b863e39128cb3c2a61e05a Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Fri, 19 Jan 2024 21:27:27 +0000 Subject: [PATCH 07/17] PR Comments except Unit Tests --- .../acceptance-test-config.yml | 2 +- .../destination_astra/config.py | 5 +++-- .../destination_astra/destination.py | 1 - .../destination_astra/indexer.py | 1 + .../integration_tests/integration_test.py | 3 +-- .../spec.json | 7 ++++--- .../unit_tests/destination_test.py | 3 +-- .../unit_tests/indexer_test.py | 3 +-- docs/integrations/destinations/astra.md | 18 +++++------------- 9 files changed, 17 insertions(+), 26 deletions(-) rename airbyte-integrations/connectors/destination-astra/{destination_astra => integration_tests}/spec.json (97%) diff --git a/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml b/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml index 5c946edf2530..7efdc25ec779 100644 --- a/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml @@ -1,5 +1,5 @@ acceptance_tests: spec: tests: - - spec_path: destination_astra/spec.json + - spec_path: integration_tests/spec.json connector_image: airbyte/destination-astra:dev \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py index 4f76e841ba83..e103a9555f8b 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py @@ -16,9 +16,10 @@ class AstraIndexingModel(BaseModel): astra_db_endpoint: str = Field( ..., title="AstraDB Endpoint", + airbyte_secret=True, description="AstraDB Endpoint", - pattern="^https:\/\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-(af|il|ap|ca|eu|me|sa|us|cn|us-gov|us-iso|us-isob)-(central|north|(north(?:east|west))|south|south(?:east|west)|east|west)([0-9]{1})\\.apps\\.astra\\.datastax\\.com", - examples=["us-east1"], + pattern="^https:\\/\\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-[^\\.]*?\\.apps\\.astra\\.datastax\\.com", + examples=["https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com"], ) astra_db_keyspace: str = Field( ..., diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py b/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py index f8623f85668d..6fa1bd7ade5b 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py @@ -40,7 +40,6 @@ def write( def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: parsed_config = ConfigModel.parse_obj(config) self._init_indexer(parsed_config) - self.indexer._create_collection() checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)] errors = [error for error in checks if error is not None] if len(errors) > 0: diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py index e036ee4550d0..5cb2771058ab 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py @@ -71,6 +71,7 @@ def delete(self, delete_ids, namespace, stream): def check(self) -> Optional[str]: try: + self._create_collection() collections = self.client.find_collections() collection = next(filter(lambda f: f["name"] == self.config.collection, collections), None) if collection is None: diff --git a/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py b/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py index 77179bd19645..b46fc5f86cf3 100644 --- a/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py @@ -33,8 +33,7 @@ def test_write(self): embedder = create_from_config(db_config.embedding, db_config.processing) db_creds = db_config.indexing astra_client = AstraClient( - db_creds.astra_db_id, - db_creds.astra_db_region, + db_creds.astra_db_endpoint, db_creds.astra_db_app_token, db_creds.astra_db_keyspace, embedder.embedding_dimensions, diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/spec.json b/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json similarity index 97% rename from airbyte-integrations/connectors/destination-astra/destination_astra/spec.json rename to airbyte-integrations/connectors/destination-astra/integration_tests/spec.json index 5d5272f81f74..a6314796a6f2 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/spec.json +++ b/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json @@ -21,9 +21,10 @@ "astra_db_endpoint": { "title": "AstraDB Endpoint", "description": "AstraDB Endpoint", - "pattern": "^https:\/\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-(af|il|ap|ca|eu|me|sa|us|cn|us-gov|us-iso|us-isob)-(central|north|(north(?:east|west))|south|south(?:east|west)|east|west)([0-9]{1})\\.apps\\.astra\\.datastax\\.com", + "pattern": "^https:\\/\\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-[^\\.]*?\\.apps\\.astra\\.datastax\\.com", "airbyte_secret": true, - "type": "string" + "type": "string", + "examples": ["https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com"] }, "astra_db_keyspace": { "title": "AstraDB Keyspace", @@ -36,7 +37,7 @@ "type": "string" } }, - "required": ["astra_db_app_token", "astra_db_id", "astra_db_region", "astra_db_keyspace", "collection"], + "required": ["astra_db_app_token", "astra_db_endpoint", "astra_db_keyspace", "collection"], "description": "Astra DB gives developers the APIs, real-time data and ecosystem integrations to put accurate RAG and Gen AI apps with fewer hallucinations in production.", "group": "indexing" }, diff --git a/airbyte-integrations/connectors/destination-astra/unit_tests/destination_test.py b/airbyte-integrations/connectors/destination-astra/unit_tests/destination_test.py index f98f09f3b713..f7d1400df709 100644 --- a/airbyte-integrations/connectors/destination-astra/unit_tests/destination_test.py +++ b/airbyte-integrations/connectors/destination-astra/unit_tests/destination_test.py @@ -18,8 +18,7 @@ def setUp(self): "embedding": {"mode": "openai", "openai_key": "mykey"}, "indexing": { "astra_db_app_token": "mytoken", - "astra_db_id": "myid", - "astra_db_region": "myregion", + "astra_db_endpoint": "https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com", "astra_db_keyspace": "mykeyspace", "collection": "mycollection", }, diff --git a/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py b/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py index 4c0b4fc0bfcc..29ae3e5f0ee5 100644 --- a/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py +++ b/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py @@ -12,8 +12,7 @@ def create_astra_indexer(): config = AstraIndexingModel( astra_db_app_token="mytoken", - astra_db_id="myid", - astra_db_region="myregion", + astra_db_endpoint="https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com", astra_db_keyspace="mykeyspace", collection="mycollection", ) diff --git a/docs/integrations/destinations/astra.md b/docs/integrations/destinations/astra.md index 23530c852f70..6a4fc8ab54a5 100644 --- a/docs/integrations/destinations/astra.md +++ b/docs/integrations/destinations/astra.md @@ -20,18 +20,6 @@ This page contains the setup guide and reference information for the destination - Click Create Database. -- You are redirected to your new database’s Overview screen. Your database starts in Pending status before transitioning to Initializing. You’ll receive a notification once your database is initialized. -#### Setting up a Vector Collection - -- From the database Overview screen, click on the Data Explorer tab -- Either enter default_namespace into the Airbyte UI under astra_db_keyspace or open the namespace dropdown, create a new namespace, and enter that instead -- Click Create Collection -- Enter a name for the collection --- Also enter this name into the Airbyte UI as collection -- Enter a vector length under Dimensions --- This should match with the embedding model you plan to use. The default model for openai is text-embedding-ada-002 which produced vectors of length 1536. -- Select a similarity metric --- Default is cosine - #### Gathering other credentials - Go back to the Overview tab on the Astra UI @@ -40,7 +28,11 @@ This page contains the setup guide and reference information for the destination ## Supported Sync Modes -Full Refresh Sync +| Feature | Supported?\(Yes/No\) | Notes | +| :----------------------------- | :------------------- | :---- | +| Full Refresh Sync | Yes | | +| Incremental - Append Sync | Yes | | +| Incremental - Append + Deduped | Yes | | ## Changelog | Version | Date | Pull Request | Subject | From c209d349042a74478c73a2591a52f2f9b2ede609 Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Mon, 22 Jan 2024 16:25:12 +0000 Subject: [PATCH 08/17] Integration and unit test fixes --- .../destination-astra/destination_astra/destination.py | 1 + .../connectors/destination-astra/destination_astra/indexer.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py b/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py index 6fa1bd7ade5b..f8623f85668d 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py @@ -40,6 +40,7 @@ def write( def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: parsed_config = ConfigModel.parse_obj(config) self._init_indexer(parsed_config) + self.indexer._create_collection() checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)] errors = [error for error in checks if error is not None] if len(errors) > 0: diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py index 5cb2771058ab..e036ee4550d0 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py @@ -71,7 +71,6 @@ def delete(self, delete_ids, namespace, stream): def check(self) -> Optional[str]: try: - self._create_collection() collections = self.client.find_collections() collection = next(filter(lambda f: f["name"] == self.config.collection, collections), None) if collection is None: From 975b89c8d97bb49683c230313c04e5bfbb6927a9 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Tue, 23 Jan 2024 16:50:28 +0100 Subject: [PATCH 09/17] cleanup --- .../acceptance-test-config.yml | 2 +- .../destination_astra/astra_client.py | 5 +- .../destination_astra/config.py | 12 +- .../destination_astra/indexer.py | 3 +- .../integration_tests/integration_test.py | 5 +- .../integration_tests/spec.json | 169 ++++++++++-------- .../connectors/destination-astra/setup.py | 4 +- .../unit_tests/indexer_test.py | 1 + 8 files changed, 109 insertions(+), 92 deletions(-) diff --git a/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml b/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml index 7efdc25ec779..75ab930dc09e 100644 --- a/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/destination-astra/acceptance-test-config.yml @@ -2,4 +2,4 @@ acceptance_tests: spec: tests: - spec_path: integration_tests/spec.json -connector_image: airbyte/destination-astra:dev \ No newline at end of file +connector_image: airbyte/destination-astra:dev diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py b/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py index 2e167e749e6f..ad5023708204 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py @@ -1,7 +1,10 @@ -import requests +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + import json from typing import Dict, List, Optional +import requests + class AstraClient: def __init__( diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py index e103a9555f8b..19b24c1dda01 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py @@ -21,16 +21,8 @@ class AstraIndexingModel(BaseModel): pattern="^https:\\/\\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-[^\\.]*?\\.apps\\.astra\\.datastax\\.com", examples=["https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com"], ) - astra_db_keyspace: str = Field( - ..., - title="AstraDB Keyspace", - description="Astra DB Keyspace" - ) - collection: str = Field( - ..., - title="AstraDB collection", - description="AstraDB collection" - ) + astra_db_keyspace: str = Field(..., title="AstraDB Keyspace", description="Astra DB Keyspace") + collection: str = Field(..., title="AstraDB collection", description="AstraDB collection") class Config: title = "Indexing" diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py index e036ee4550d0..6bd29588059d 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py @@ -10,9 +10,8 @@ from airbyte_cdk.destinations.vector_db_based.indexer import Indexer from airbyte_cdk.destinations.vector_db_based.utils import create_chunks, create_stream_identifier, format_exception from airbyte_cdk.models.airbyte_protocol import ConfiguredAirbyteCatalog, DestinationSyncMode -from destination_astra.config import AstraIndexingModel from destination_astra.astra_client import AstraClient - +from destination_astra.config import AstraIndexingModel # do not flood the server with too many connections in parallel PARALLELISM_LIMIT = 20 diff --git a/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py b/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py index b46fc5f86cf3..b9d1aac8ae3c 100644 --- a/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/destination-astra/integration_tests/integration_test.py @@ -4,13 +4,12 @@ import logging +from airbyte_cdk.destinations.vector_db_based.embedder import create_from_config from airbyte_cdk.destinations.vector_db_based.test_utils import BaseIntegrationTest from airbyte_cdk.models import DestinationSyncMode, Status -from destination_astra.destination import DestinationAstra - from destination_astra.astra_client import AstraClient from destination_astra.config import ConfigModel -from airbyte_cdk.destinations.vector_db_based.embedder import create_from_config +from destination_astra.destination import DestinationAstra class AstraIntegrationTest(BaseIntegrationTest): diff --git a/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json b/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json index a6314796a6f2..383b7185dfd6 100644 --- a/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json +++ b/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json @@ -1,46 +1,10 @@ { "documentationUrl": "https://docs.airbyte.com/integrations/destinations/astra", - "supported_destination_sync_modes": ["overwrite", "append", "append_dedup"], - "supportsIncremental": true, "connectionSpecification": { "title": "Destination Config", "description": "The configuration model for the Vector DB based destinations. This model is used to generate the UI for the destination configuration,\nas well as to provide type safety for the configuration passed to the destination.\n\nThe configuration model is composed of four parts:\n* Processing configuration\n* Embedding configuration\n* Indexing configuration\n* Advanced configuration\n\nProcessing, embedding and advanced configuration are provided by this base class, while the indexing configuration is provided by the destination connector in the sub class.", "type": "object", - "required": ["embedding", "processing", "indexing"], "properties": { - "indexing": { - "title": "Indexing", - "type": "object", - "properties": { - "astra_db_app_token": { - "title": "AstraDB Application Token", - "description": "AstraDB Application Token", - "airbyte_secret": true, - "type": "string" - }, - "astra_db_endpoint": { - "title": "AstraDB Endpoint", - "description": "AstraDB Endpoint", - "pattern": "^https:\\/\\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-[^\\.]*?\\.apps\\.astra\\.datastax\\.com", - "airbyte_secret": true, - "type": "string", - "examples": ["https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com"] - }, - "astra_db_keyspace": { - "title": "AstraDB Keyspace", - "description": "Astra DB Keyspace", - "type": "string" - }, - "collection": { - "title": "AstraDB collection", - "description": "AstraDB collection", - "type": "string" - } - }, - "required": ["astra_db_app_token", "astra_db_endpoint", "astra_db_keyspace", "collection"], - "description": "Astra DB gives developers the APIs, real-time data and ecosystem integrations to put accurate RAG and Gen AI apps with fewer hallucinations in production.", - "group": "indexing" - }, "embedding": { "title": "Embedding", "description": "Embedding configuration", @@ -99,8 +63,8 @@ "type": "string" } }, - "required": ["mode"], - "description": "Use a fake embedding made out of random vectors with 1536 embedding dimensions. This is useful for testing the data pipeline without incurring any costs." + "description": "Use a fake embedding made out of random vectors with 1536 embedding dimensions. This is useful for testing the data pipeline without incurring any costs.", + "required": ["mode"] }, { "title": "Azure OpenAI", @@ -184,8 +148,8 @@ "chunk_size": { "title": "Chunk size", "description": "Size of chunks in tokens to store in vector store (make sure it is not too big for the context if your LLM)", - "minimum": 1, "maximum": 8191, + "minimum": 1, "type": "integer" }, "chunk_overlap": { @@ -201,7 +165,9 @@ "always_show": true, "examples": ["text", "user.name", "users.*.name"], "type": "array", - "items": { "type": "string" } + "items": { + "type": "string" + } }, "metadata_fields": { "title": "Fields to store as metadata", @@ -210,29 +176,8 @@ "always_show": true, "examples": ["age", "user", "user.name"], "type": "array", - "items": { "type": "string" } - }, - "field_name_mappings": { - "title": "Field name mappings", - "description": "List of fields to rename. Not applicable for nested fields, but can be used to rename fields already flattened via dot notation.", - "default": [], - "type": "array", "items": { - "title": "FieldNameMappingConfigModel", - "type": "object", - "properties": { - "from_field": { - "title": "From field name", - "description": "The field name in the source", - "type": "string" - }, - "to_field": { - "title": "To field name", - "description": "The field name to use in the destination", - "type": "string" - } - }, - "required": ["from_field", "to_field"] + "type": "string" } }, "text_splitter": { @@ -256,7 +201,9 @@ "description": "List of separator strings to split text fields by. The separator itself needs to be wrapped in double quotes, e.g. to split by the dot character, use \".\". To split by a newline, use \"\\n\".", "default": ["\"\\n\\n\"", "\"\\n\"", "\" \"", "\"\""], "type": "array", - "items": { "type": "string" } + "items": { + "type": "string" + } }, "keep_separator": { "title": "Keep separator", @@ -265,8 +212,8 @@ "type": "boolean" } }, - "required": ["mode"], - "description": "Split the text by the list of separators until the chunk size is reached, using the earlier mentioned separators where possible. This is useful for splitting text fields by paragraphs, sentences, words, etc." + "description": "Split the text by the list of separators until the chunk size is reached, using the earlier mentioned separators where possible. This is useful for splitting text fields by paragraphs, sentences, words, etc.", + "required": ["mode"] }, { "title": "By Markdown header", @@ -288,8 +235,8 @@ "type": "integer" } }, - "required": ["mode"], - "description": "Split the text by Markdown headers down to the specified header level. If the chunk size fits multiple sections, they will be combined into a single chunk." + "description": "Split the text by Markdown headers down to the specified header level. If the chunk size fits multiple sections, they will be combined into a single chunk.", + "required": ["mode"] }, { "title": "By Programming Language", @@ -330,6 +277,29 @@ "description": "Split the text by suitable delimiters based on the programming language. This is useful for splitting code into chunks." } ] + }, + "field_name_mappings": { + "title": "Field name mappings", + "description": "List of fields to rename. Not applicable for nested fields, but can be used to rename fields already flattened via dot notation.", + "default": [], + "type": "array", + "items": { + "title": "FieldNameMappingConfigModel", + "type": "object", + "properties": { + "from_field": { + "title": "From field name", + "description": "The field name in the source", + "type": "string" + }, + "to_field": { + "title": "To field name", + "description": "The field name to use in the destination", + "type": "string" + } + }, + "required": ["from_field", "to_field"] + } } }, "required": ["chunk_size"], @@ -341,13 +311,68 @@ "default": false, "group": "advanced", "type": "boolean" + }, + "indexing": { + "title": "Indexing", + "type": "object", + "properties": { + "astra_db_app_token": { + "title": "AstraDB Application Token", + "description": "AstraDB Application Token", + "airbyte_secret": true, + "type": "string" + }, + "astra_db_endpoint": { + "title": "AstraDB Endpoint", + "description": "AstraDB Endpoint", + "airbyte_secret": true, + "pattern": "^https:\\/\\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-[^\\.]*?\\.apps\\.astra\\.datastax\\.com", + "examples": [ + "https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com" + ], + "type": "string" + }, + "astra_db_keyspace": { + "title": "AstraDB Keyspace", + "description": "Astra DB Keyspace", + "type": "string" + }, + "collection": { + "title": "AstraDB collection", + "description": "AstraDB collection", + "type": "string" + } + }, + "required": [ + "astra_db_app_token", + "astra_db_endpoint", + "astra_db_keyspace", + "collection" + ], + "description": "Astra DB gives developers the APIs, real-time data and ecosystem integrations to put accurate RAG and Gen AI apps with fewer hallucinations in production.", + "group": "indexing" } }, + "required": ["embedding", "processing", "indexing"], "groups": [ - { "id": "processing", "title": "Processing" }, - { "id": "embedding", "title": "Embedding" }, - { "id": "indexing", "title": "Indexing" }, - { "id": "advanced", "title": "Advanced" } + { + "id": "processing", + "title": "Processing" + }, + { + "id": "embedding", + "title": "Embedding" + }, + { + "id": "indexing", + "title": "Indexing" + }, + { + "id": "advanced", + "title": "Advanced" + } ] - } + }, + "supportsIncremental": true, + "supported_destination_sync_modes": ["overwrite", "append", "append_dedup"] } diff --git a/airbyte-integrations/connectors/destination-astra/setup.py b/airbyte-integrations/connectors/destination-astra/setup.py index 4dd255dbfd75..8bd1a185b52e 100644 --- a/airbyte-integrations/connectors/destination-astra/setup.py +++ b/airbyte-integrations/connectors/destination-astra/setup.py @@ -5,9 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = [ - "airbyte-cdk[vector-db-based]==0.57.0" -] +MAIN_REQUIREMENTS = ["airbyte-cdk[vector-db-based]==0.57.0"] TEST_REQUIREMENTS = ["pytest~=6.2"] diff --git a/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py b/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py index 29ae3e5f0ee5..88de2d68d71e 100644 --- a/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py +++ b/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # from unittest.mock import ANY, MagicMock, Mock, patch + import pytest import urllib3 from airbyte_cdk.models import ConfiguredAirbyteCatalog From 705131c128a3a532abcc718c05905540a3ef53dc Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Tue, 23 Jan 2024 16:51:12 +0100 Subject: [PATCH 10/17] enable --- .../connectors/destination-astra/metadata.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-astra/metadata.yaml b/airbyte-integrations/connectors/destination-astra/metadata.yaml index 9d14a8789d77..c9c245c358d3 100644 --- a/airbyte-integrations/connectors/destination-astra/metadata.yaml +++ b/airbyte-integrations/connectors/destination-astra/metadata.yaml @@ -4,9 +4,9 @@ data: - "*.apps.astra.datastax.com" registries: oss: - enabled: false + enabled: true cloud: - enabled: false + enabled: true connectorBuildOptions: # Please update to the latest version of the connector base image. # Please use the full address with sha256 hash to guarantee build reproducibility. From 276da692e6d3c30680c21d26457d2c7fe85b7ec5 Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Tue, 23 Jan 2024 17:46:24 +0000 Subject: [PATCH 11/17] Remove airbyte_secret from endpoint --- .../connectors/destination-astra/destination_astra/config.py | 1 - .../connectors/destination-astra/integration_tests/spec.json | 1 - 2 files changed, 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py index e103a9555f8b..aa41dc51277c 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py @@ -16,7 +16,6 @@ class AstraIndexingModel(BaseModel): astra_db_endpoint: str = Field( ..., title="AstraDB Endpoint", - airbyte_secret=True, description="AstraDB Endpoint", pattern="^https:\\/\\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-[^\\.]*?\\.apps\\.astra\\.datastax\\.com", examples=["https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com"], diff --git a/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json b/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json index a6314796a6f2..758d8ce327bd 100644 --- a/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json +++ b/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json @@ -22,7 +22,6 @@ "title": "AstraDB Endpoint", "description": "AstraDB Endpoint", "pattern": "^https:\\/\\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-[^\\.]*?\\.apps\\.astra\\.datastax\\.com", - "airbyte_secret": true, "type": "string", "examples": ["https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com"] }, From c17b7c1fe339462a2fc93455f3be48931fec752c Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Wed, 24 Jan 2024 20:11:12 +0000 Subject: [PATCH 12/17] Move create collection indexer and Readable error message --- .../destination-astra/destination_astra/astra_client.py | 5 +++-- .../destination-astra/destination_astra/destination.py | 1 - .../destination-astra/destination_astra/indexer.py | 3 +++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py b/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py index 2e167e749e6f..6df98dba04f8 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py @@ -1,6 +1,7 @@ import requests import json from typing import Dict, List, Optional +import urllib3 class AstraClient: @@ -34,9 +35,9 @@ def _run_query(self, request_url: str, query: Dict): else: return response_dict else: - raise Exception(f"Astra DB not available. Status code: {response.status_code}, {response.text}") + raise urllib3.exceptions.HTTPError(f"Astra DB not available. Status code: {response.status_code}, {response.text}") except Exception: - raise Exception + raise def find_collections(self, include_detail: bool = True): query = {"findCollections": {"options": {"explain": include_detail}}} diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py b/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py index f8623f85668d..6fa1bd7ade5b 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/destination.py @@ -40,7 +40,6 @@ def write( def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: parsed_config = ConfigModel.parse_obj(config) self._init_indexer(parsed_config) - self.indexer._create_collection() checks = [self.embedder.check(), self.indexer.check(), DocumentProcessor.check_config(parsed_config.processing)] errors = [error for error in checks if error is not None] if len(errors) > 0: diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py index e036ee4550d0..d38d55fea2b9 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/indexer.py @@ -71,6 +71,7 @@ def delete(self, delete_ids, namespace, stream): def check(self) -> Optional[str]: try: + self._create_collection() collections = self.client.find_collections() collection = next(filter(lambda f: f["name"] == self.config.collection, collections), None) if collection is None: @@ -83,6 +84,8 @@ def check(self) -> Optional[str]: if isinstance(e, urllib3.exceptions.MaxRetryError): if "Failed to resolve 'apps.astra.datastax.com'" in str(e.reason): return "Failed to resolve environment, please check whether the credential is correct." + if isinstance(e, urllib3.exceptions.HTTPError): + return str(e) formatted_exception = format_exception(e) return formatted_exception From 4d2f5f455a6e5626e228590d823471540e2fc286 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Thu, 25 Jan 2024 09:39:44 +0100 Subject: [PATCH 13/17] fix some things --- .../destination-astra/destination_astra/astra_client.py | 2 +- .../connectors/destination-astra/unit_tests/indexer_test.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py b/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py index 5fec0faaf71d..527c8345daa0 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/astra_client.py @@ -2,9 +2,9 @@ import json from typing import Dict, List, Optional -import urllib3 import requests +import urllib3 class AstraClient: diff --git a/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py b/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py index 88de2d68d71e..ebb1d41e230a 100644 --- a/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py +++ b/airbyte-integrations/connectors/destination-astra/unit_tests/indexer_test.py @@ -155,6 +155,7 @@ def test_astra_pre_sync(): def test_astra_check(collection_name, describe_throws, reported_dimensions, check_succeeds, error_message): indexer = create_astra_indexer() + indexer.client.create_collection = MagicMock() indexer.client.find_collections = MagicMock() indexer.client.find_collections.return_value = [create_index_description(collection_name=collection_name, dimensions=reported_dimensions)] From 149c8ad20f328f51866f06ed1f3c0571b75ad836 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Thu, 25 Jan 2024 09:57:30 +0100 Subject: [PATCH 14/17] update docs --- docs/integrations/destinations/astra.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/destinations/astra.md b/docs/integrations/destinations/astra.md index 6a4fc8ab54a5..554368588307 100644 --- a/docs/integrations/destinations/astra.md +++ b/docs/integrations/destinations/astra.md @@ -1,6 +1,7 @@ # Astra Destination This page contains the setup guide and reference information for the destination-astra connector. + ## Pre-Requisites - An OpenAI, AzureOpenAI, Cohere, etc. API Key From d5e766d6a96580badae33c951b96eef05295f6c7 Mon Sep 17 00:00:00 2001 From: Obioma Anomnachi Date: Fri, 26 Jan 2024 16:43:05 +0000 Subject: [PATCH 15/17] DS Branding and Tooltips --- .../destination_astra/config.py | 12 ++-- .../connectors/destination-astra/icon.svg | 57 ++++--------------- .../integration_tests/spec.json | 19 ++++--- .../destination-astra/metadata.yaml | 6 +- docs/integrations/destinations/astra.md | 3 +- 5 files changed, 33 insertions(+), 64 deletions(-) diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py index 7606aab4f1c5..89d8a8b6cc51 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py @@ -9,19 +9,19 @@ class AstraIndexingModel(BaseModel): astra_db_app_token: str = Field( ..., - title="AstraDB Application Token", + title="Astra DB Application Token", airbyte_secret=True, - description="AstraDB Application Token", + description="The application token authorizes a user to connect to a specific Astra DB database. It is created when the user clicks the Generate Token button on the Overview tab of the Database page in the Astra UI.", ) astra_db_endpoint: str = Field( ..., - title="AstraDB Endpoint", - description="AstraDB Endpoint", + title="Astra DB Endpoint", + description="The endpoint specifies which Astra DB database queries are sent to. It can be copied from the Database Details section of the Overview tab of the Database page in the Astra UI.", pattern="^https:\\/\\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-[^\\.]*?\\.apps\\.astra\\.datastax\\.com", examples=["https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com"], ) - astra_db_keyspace: str = Field(..., title="AstraDB Keyspace", description="Astra DB Keyspace") - collection: str = Field(..., title="AstraDB collection", description="AstraDB collection") + astra_db_keyspace: str = Field(..., title="Astra DB Keyspace", description="Keyspaces (or Namespaces) serve as containers for organizing data within a database. You can create a new keyspace uisng the Data Explorer tab in the Astra UI. The keyspace default_keyspace is created for you when you create a Vector Database in Astra DB.") + collection: str = Field(..., title="Astra DB collection", description="Collections hold data. They are analagous to tables in traditional Cassandra terminology. This tool will create the collection with the provided name automatically if it does not already exist. Alternatively, you can create one thorugh the Data Explorer tab in the Astra UI.") class Config: title = "Indexing" diff --git a/airbyte-integrations/connectors/destination-astra/icon.svg b/airbyte-integrations/connectors/destination-astra/icon.svg index 2d1f6c918ed9..9ba7973201a2 100644 --- a/airbyte-integrations/connectors/destination-astra/icon.svg +++ b/airbyte-integrations/connectors/destination-astra/icon.svg @@ -1,46 +1,13 @@ - - - - - - - - - + + + + + + + + + + + + diff --git a/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json b/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json index a94caed893cf..4ce20decb60f 100644 --- a/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json +++ b/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json @@ -317,14 +317,14 @@ "type": "object", "properties": { "astra_db_app_token": { - "title": "AstraDB Application Token", - "description": "AstraDB Application Token", + "title": "Astra DB Application Token", + "description": "The application token authorizes a user to connect to a specific Astra DB database. It is created when the user clicks the Generate Token button on the Overview tab of the Database page in the Astra UI.", "airbyte_secret": true, "type": "string" }, "astra_db_endpoint": { - "title": "AstraDB Endpoint", - "description": "AstraDB Endpoint", + "title": "Astra DB Endpoint", + "description": "The endpoint specifies which Astra DB database queries are sent to. It can be copied from the Database Details section of the Overview tab of the Database page in the Astra UI.", "pattern": "^https:\\/\\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-[^\\.]*?\\.apps\\.astra\\.datastax\\.com", "examples": [ "https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com" @@ -332,13 +332,14 @@ "type": "string" }, "astra_db_keyspace": { - "title": "AstraDB Keyspace", - "description": "Astra DB Keyspace", - "type": "string" + "title": "Astra DB Keyspace", + "description": "Keyspaces (or Namespaces) serve as containers for organizing data within a database. You can create a new keyspace uisng the Data Explorer tab in the Astra UI. The keyspace default_keyspace is created for you when you create a Vector Database in Astra DB.", + "type": "string", + "default": "default_keyspace" }, "collection": { - "title": "AstraDB collection", - "description": "AstraDB collection", + "title": "Astra DB collection", + "description": "Collections hold data. They are analagous to tables in traditional Cassandra terminology. This tool will create the collection with the provided name automatically if it does not already exist. Alternatively, you can create one thorugh the Data Explorer tab in the Astra UI.", "type": "string" } }, diff --git a/airbyte-integrations/connectors/destination-astra/metadata.yaml b/airbyte-integrations/connectors/destination-astra/metadata.yaml index c9c245c358d3..6a6abcd3807f 100644 --- a/airbyte-integrations/connectors/destination-astra/metadata.yaml +++ b/airbyte-integrations/connectors/destination-astra/metadata.yaml @@ -15,13 +15,13 @@ data: connectorSubtype: database connectorType: destination definitionId: 042ce96f-1158-4662-9543-e2ff015be97a - dockerImageTag: 0.1.0 + dockerImageTag: 0.1.1 dockerRepository: airbyte/destination-astra githubIssueLabel: destination-astra icon: astra.svg license: MIT - name: Astra - releaseDate: 2024-01-10 + name: Astra DB + releaseDate: 2024-01-26 releaseStage: alpha supportLevel: community documentationUrl: https://docs.airbyte.com/integrations/destinations/astra diff --git a/docs/integrations/destinations/astra.md b/docs/integrations/destinations/astra.md index 554368588307..95ab5e952ee5 100644 --- a/docs/integrations/destinations/astra.md +++ b/docs/integrations/destinations/astra.md @@ -1,4 +1,4 @@ -# Astra Destination +# Astra DB Destination This page contains the setup guide and reference information for the destination-astra connector. @@ -39,3 +39,4 @@ This page contains the setup guide and reference information for the destination | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :-------------------------- | | 0.1.0 | 2024-01-08 | | Initial Release | +| 0.1.1 | 2024-01-26 | | DS Branding Update | From cc6bfc17b41c534b53dd726c924b9a3e10f6921c Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 29 Jan 2024 11:58:38 +0100 Subject: [PATCH 16/17] make icon transparent background --- airbyte-integrations/connectors/destination-astra/icon.svg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-astra/icon.svg b/airbyte-integrations/connectors/destination-astra/icon.svg index 9ba7973201a2..ecc353976f51 100644 --- a/airbyte-integrations/connectors/destination-astra/icon.svg +++ b/airbyte-integrations/connectors/destination-astra/icon.svg @@ -1,5 +1,5 @@ - + From e4aa55d6ba61cd9d600585d546fabcb43bb3ec41 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 29 Jan 2024 12:01:06 +0100 Subject: [PATCH 17/17] format --- .../destination-astra/destination_astra/config.py | 12 ++++++++++-- .../destination-astra/integration_tests/spec.json | 3 +-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py index 89d8a8b6cc51..01d805ecd782 100644 --- a/airbyte-integrations/connectors/destination-astra/destination_astra/config.py +++ b/airbyte-integrations/connectors/destination-astra/destination_astra/config.py @@ -20,8 +20,16 @@ class AstraIndexingModel(BaseModel): pattern="^https:\\/\\/([a-z]|[0-9]){8}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){4}-([a-z]|[0-9]){12}-[^\\.]*?\\.apps\\.astra\\.datastax\\.com", examples=["https://8292d414-dd1b-4c33-8431-e838bedc04f7-us-east1.apps.astra.datastax.com"], ) - astra_db_keyspace: str = Field(..., title="Astra DB Keyspace", description="Keyspaces (or Namespaces) serve as containers for organizing data within a database. You can create a new keyspace uisng the Data Explorer tab in the Astra UI. The keyspace default_keyspace is created for you when you create a Vector Database in Astra DB.") - collection: str = Field(..., title="Astra DB collection", description="Collections hold data. They are analagous to tables in traditional Cassandra terminology. This tool will create the collection with the provided name automatically if it does not already exist. Alternatively, you can create one thorugh the Data Explorer tab in the Astra UI.") + astra_db_keyspace: str = Field( + ..., + title="Astra DB Keyspace", + description="Keyspaces (or Namespaces) serve as containers for organizing data within a database. You can create a new keyspace uisng the Data Explorer tab in the Astra UI. The keyspace default_keyspace is created for you when you create a Vector Database in Astra DB.", + ) + collection: str = Field( + ..., + title="Astra DB collection", + description="Collections hold data. They are analagous to tables in traditional Cassandra terminology. This tool will create the collection with the provided name automatically if it does not already exist. Alternatively, you can create one thorugh the Data Explorer tab in the Astra UI.", + ) class Config: title = "Indexing" diff --git a/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json b/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json index 4ce20decb60f..35951290a06c 100644 --- a/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json +++ b/airbyte-integrations/connectors/destination-astra/integration_tests/spec.json @@ -334,8 +334,7 @@ "astra_db_keyspace": { "title": "Astra DB Keyspace", "description": "Keyspaces (or Namespaces) serve as containers for organizing data within a database. You can create a new keyspace uisng the Data Explorer tab in the Astra UI. The keyspace default_keyspace is created for you when you create a Vector Database in Astra DB.", - "type": "string", - "default": "default_keyspace" + "type": "string" }, "collection": { "title": "Astra DB collection",