Skip to content

Commit

Permalink
restore kvdb to state from #35424 (#35454)
Browse files Browse the repository at this point in the history
  • Loading branch information
erohmensing authored Feb 20, 2024
1 parent 944c960 commit e7ab4f5
Show file tree
Hide file tree
Showing 11 changed files with 1,524 additions and 0 deletions.
118 changes: 118 additions & 0 deletions airbyte-integrations/connectors/destination-kvdb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Kvdb Destination

This is the repository for the [Kvdb](https://kvdb.io) destination connector, written in Python. It is intended to be an example for how to write a Python destination. KvDB is a very simple key value store, which makes it great for the purposes of illustrating how to write a Python destination connector.

## Local development

### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**

#### Minimum Python version required `= 3.7.0`

#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
```

This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.

Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.

#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:destination-kvdb:build
```

#### Create credentials
**If you are a community contributor**, generate the necessary credentials from [Kvdb](https://kvdb.io/docs/api/), and then create a file `secrets/config.json` conforming to the `destination_kvdb/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.

**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination kvdb test creds`
and place them into `secrets/config.json`.

### Locally running the connector
```
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```

### Locally running the connector docker image



#### Build
**Via [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md) (recommended):**
```bash
airbyte-ci connectors --name=destination-kvdb build
```

An image will be built with the tag `airbyte/destination-kvdb:dev`.

**Via `docker build`:**
```bash
docker build -t airbyte/destination-kvdb:dev .
```
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-kvdb:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-kvdb:dev check --config /secrets/config.json
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-kvdb:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
```
### Unit Tests
To run unit tests locally, from the connector directory run:
```
python -m pytest unit_tests
```

### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).
#### Custom Integration tests
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
```
python -m pytest integration_tests
```
#### Acceptance Tests
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
```bash
airbyte-ci connectors --name=destination-kvdb test
```


## Dependency Management
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
We split dependencies between two groups, dependencies that are:
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
* required for the testing need to go to `TEST_REQUIREMENTS` list

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=destination-kvdb test`
2. Bump the connector version in `metadata.yaml`: increment the `dockerImageTag` value. Please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors).
3. Make sure the `metadata.yaml` content is up to date.
4. Make the connector documentation and its changelog is up to date (`docs/integrations/destinations/kvdb.md`).
5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention).
6. Pat yourself on the back for being an awesome contributor.
7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


from .destination import DestinationKvdb

__all__ = ["DestinationKvdb"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Iterable, List, Mapping, Tuple, Union

import requests


class KvDbClient:
base_url = "https://kvdb.io"
PAGE_SIZE = 1000

def __init__(self, bucket_id: str, secret_key: str = None):
self.secret_key = secret_key
self.bucket_id = bucket_id

def write(self, key: str, value: Mapping[str, Any]):
return self.batch_write([(key, value)])

def batch_write(self, keys_and_values: List[Tuple[str, Mapping[str, Any]]]):
"""
https://kvdb.io/docs/api/#execute-transaction
"""
request_body = {"txn": [{"set": key, "value": value} for key, value in keys_and_values]}
return self._request("POST", json=request_body)

def list_keys(self, list_values: bool = False, prefix: str = None) -> Iterable[Union[str, List]]:
"""
https://kvdb.io/docs/api/#list-keys
"""
# TODO handle rate limiting
pagination_complete = False
offset = 0

while not pagination_complete:
response = self._request(
"GET",
params={
"limit": self.PAGE_SIZE,
"skip": offset,
"format": "json",
"prefix": prefix or "",
"values": "true" if list_values else "false",
},
endpoint="/", # the "list" endpoint doesn't work without adding a trailing slash to the URL
)

response_json = response.json()
yield from response_json

pagination_complete = len(response_json) < self.PAGE_SIZE
offset += self.PAGE_SIZE

def delete(self, key: Union[str, List[str]]):
"""
https://kvdb.io/docs/api/#execute-transaction
"""
key_list = key if isinstance(key, List) else [key]
request_body = {"txn": [{"delete": k} for k in key_list]}
return self._request("POST", json=request_body)

def _get_base_url(self) -> str:
return f"{self.base_url}/{self.bucket_id}"

def _get_auth_headers(self) -> Mapping[str, Any]:
return {"Authorization": f"Bearer {self.secret_key}"} if self.secret_key else {}

def _request(
self, http_method: str, endpoint: str = None, params: Mapping[str, Any] = None, json: Mapping[str, Any] = None
) -> requests.Response:
url = self._get_base_url() + (endpoint or "")
headers = {"Accept": "application/json", **self._get_auth_headers()}

response = requests.request(method=http_method, params=params, url=url, headers=headers, json=json)

response.raise_for_status()
return response
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import time
import traceback
import uuid
from typing import Any, Iterable, Mapping

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
from destination_kvdb.client import KvDbClient
from destination_kvdb.writer import KvDbWriter


class DestinationKvdb(Destination):
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:

"""
Reads the input stream of messages, config, and catalog to write data to the destination.
This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received
in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been
successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing,
then the source is given the last state message output from this method as the starting point of the next sync.
"""
writer = KvDbWriter(KvDbClient(**config))

for configured_stream in configured_catalog.streams:
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
writer.delete_stream_entries(configured_stream.stream.name)

for message in input_messages:
if message.type == Type.STATE:
# Emitting a state message indicates that all records which came before it have been written to the destination. So we flush
# the queue to ensure writes happen, then output the state message to indicate it's safe to checkpoint state
writer.flush()
yield message
elif message.type == Type.RECORD:
record = message.record
writer.queue_write_operation(
record.stream, record.data, time.time_ns() / 1_000_000
) # convert from nanoseconds to milliseconds
else:
# ignore other message types for now
continue

# Make sure to flush any records still in the queue
writer.flush()

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
e.g: if a provided API token or password can be used to connect and write to the destination.
"""
try:
# Verify write access by attempting to write and then delete to a random key
client = KvDbClient(**config)
random_key = str(uuid.uuid4())
client.write(random_key, {"value": "_airbyte_connection_check"})
client.delete(random_key)
except Exception as e:
traceback.print_exc()
return AirbyteConnectionStatus(
status=Status.FAILED, message=f"An exception occurred: {e}. \nStacktrace: \n{traceback.format_exc()}"
)
else:
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"documentationUrl": "https://kvdb.io/docs/api/",
"supported_destination_sync_modes": ["overwrite", "append"],
"supportsIncremental": true,
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Destination KVdb",
"type": "object",
"required": ["bucket_id", "secret_key"],
"additionalProperties": false,
"properties": {
"bucket_id": {
"title": "Bucket ID",
"type": "string",
"description": "The ID of your KVdb bucket.",
"order": 1
},
"secret_key": {
"title": "Secret Key",
"type": "string",
"description": "Your bucket Secret Key.",
"order": 2
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from collections import Mapping

from destination_kvdb.client import KvDbClient


class KvDbWriter:
"""
Data is written to KvDB in the following format:
key: stream_name__ab__<record_extraction_timestamp>
value: a JSON object representing the record's data
This is because unless a data source explicitly designates a primary key, we don't know what to key the record on.
Since KvDB allows reading records with certain prefixes, we treat it more like a message queue, expecting the reader to
read messages with a particular prefix e.g: name__ab__123, where 123 is the timestamp they last read data from.
"""

write_buffer = []
flush_interval = 1000

def __init__(self, client: KvDbClient):
self.client = client

def delete_stream_entries(self, stream_name: str):
"""Deletes all the records belonging to the input stream"""
keys_to_delete = []
for key in self.client.list_keys(prefix=f"{stream_name}__ab__"):
keys_to_delete.append(key)
if len(keys_to_delete) == self.flush_interval:
self.client.delete(keys_to_delete)
keys_to_delete.clear()
if len(keys_to_delete) > 0:
self.client.delete(keys_to_delete)

def queue_write_operation(self, stream_name: str, record: Mapping, written_at: int):
kv_pair = (f"{stream_name}__ab__{written_at}", record)
self.write_buffer.append(kv_pair)
if len(self.write_buffer) == self.flush_interval:
self.flush()

def flush(self):
self.client.batch_write(self.write_buffer)
self.write_buffer.clear()
11 changes: 11 additions & 0 deletions airbyte-integrations/connectors/destination-kvdb/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import sys

from destination_kvdb import DestinationKvdb

if __name__ == "__main__":
DestinationKvdb().run(sys.argv[1:])
Loading

0 comments on commit e7ab4f5

Please sign in to comment.