Skip to content

Commit

Permalink
asyncio tables and api and machines
Browse files Browse the repository at this point in the history
  • Loading branch information
henryivesjones committed Oct 24, 2022
1 parent e0acc9a commit e8a83d0
Show file tree
Hide file tree
Showing 14 changed files with 235 additions and 57 deletions.
26 changes: 26 additions & 0 deletions examples/asyncio_upload_csv_to_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import asyncio
import time

from tulip_api.asyncio import TulipAPI, TulipTable, TulipTableCSVUploader

concurrency = 40
filename = "example.csv"
table_id = "PT929bpqB3s84bbTf"


async def main():
start_time = time.time()
with TulipAPI(
"abc.tulip.co",
concurrency=concurrency,
) as api:
uploaded_records = await TulipTableCSVUploader(
TulipTable(api, table_id), filename
).execute(create_random_id=True, warn_on_failure=True)
print(
f"Uploaded {uploaded_records} to table {table_id} in {round(time.time() - start_time, 2)} seconds."
)


if __name__ == "__main__":
asyncio.run(main())
23 changes: 23 additions & 0 deletions scripts/runBuildInstallExamples.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash
set -e
cd "$(dirname "${BASH_SOURCE[0]}")"

cd ..
python3 -m venv .venv
source .venv/bin/activate

pip3 install -r dev-requirements.txt >/dev/null
echo "Installed DEV-REQUIREMENTS..."

rm -rf -f dist

python3 -m build >/dev/null
echo "Built community-tulip-api package."

cd dist

pip3 uninstall community-tulip-api -y >/dev/null
echo "Uninstalled existing community-tulip-api installation."

pip3 install $(ls -AU | head -1) >/dev/null
echo "Installed new community-tulip-api build."
12 changes: 6 additions & 6 deletions tulip_api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .cached_tulip_table import CachedTulipTable
from .tulip_api import TulipAPI
from .tulip_machine import TulipMachine
from .tulip_table import TulipTable
from .tulip_table_csv_upload import TulipTableCSVUploader
from .tulip_table_link import TulipTableLink
from tulip_api.cached_tulip_table import CachedTulipTable
from tulip_api.tulip_api import TulipAPI
from tulip_api.tulip_machine import TulipMachine
from tulip_api.tulip_table import TulipTable
from tulip_api.tulip_table_csv_upload import TulipTableCSVUploader
from tulip_api.tulip_table_link import TulipTableLink
9 changes: 3 additions & 6 deletions tulip_api/asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from .tulip_api import TulipAPI

# from .tulip_machine import TulipMachine
from .tulip_table import TulipTable

# from .tulip_table_link import TulipTableLink
from tulip_api.asyncio.tulip_api import TulipAPI
from tulip_api.asyncio.tulip_table import TulipTable
from tulip_api.asyncio.tulip_table_csv_upload import TulipTableCSVUploader
36 changes: 11 additions & 25 deletions tulip_api/asyncio/tulip_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TulipAPIAsyncUnknownResponse,
TulipAPINoCredentialsFound,
)
from tulip_api.tulip_api import TulipAPIResponseCodes


class TulipAPI:
Expand All @@ -24,6 +25,7 @@ class TulipAPI:
def __init__(
self,
tulip_url: str,
concurrency: int = 40,
api_key: Union[str, None] = None,
api_key_secret: Union[str, None] = None,
auth: Union[str, None] = None,
Expand All @@ -39,23 +41,14 @@ def __init__(
)

self.headers = self._construct_headers()
# self.session = aiohttp.ClientSession()

# async def _make_request(
# self,
# path: str,
# method: str,
# params: Union[dict, None] = None,
# json: Any = None,
# ):
# async with aiohttp.request(
# method,
# self._construct_url(path),
# params=params,
# json=json,
# headers=self.headers,
# ) as response:
# return self._handle_api_response(response)
self.concurrency = concurrency

def __enter__(self):
self.connector = aiohttp.TCPConnector(limit=self.concurrency)
return self

def __exit__(self, _, __, ___):
self.connector.close()

async def make_request(
self,
Expand All @@ -73,6 +66,7 @@ async def make_request(
params=params,
json=json,
headers=self.headers,
connector=self.connector,
) as response:
return await self._handle_api_response(response).json()

Expand Down Expand Up @@ -136,11 +130,3 @@ def _handle_api_response(self, response: aiohttp.ClientResponse):
if response.status in TulipAPIResponseCodes.UNEXCPECTED_ERROR_CODES:
raise TulipAPIAsyncInternalError(response)
raise TulipAPIAsyncUnknownResponse(response)


class TulipAPIResponseCodes:
SUCCESS_CODES = {200, 201, 204}
MALFORMED_CODES = {400, 422}
NOT_FOUND_CODES = {404}
UNAUTHENTICATED_CODES = {401, 403}
UNEXCPECTED_ERROR_CODES = {500}
41 changes: 41 additions & 0 deletions tulip_api/asyncio/tulip_machine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import Any, Dict

from tulip_api.asyncio.tulip_api import TulipAPI


class TulipMachine:
"""
An interface with the machine event reporting api.
"""

attributes_report_path: str = "attributes/report"

def __init__(self, tulip_api: TulipAPI, machine_id: str):
self.tulip_api = tulip_api
self.machine_id = machine_id

async def send_event(self, attributes: Dict[str, Any]):
"""
POST `/attributes/report`
`attributes`: A dict with attributeId: value pairs.
```
{
"attributeIdA": "valueA",
"attributeIdB": "valueB",
}
```
"""
await self.tulip_api.make_request_expect_nothing(
TulipMachine.attributes_report_path,
"POST",
json=self._construct_attributes(attributes),
)

def _construct_attributes(self, attributes: Dict[str, Any]):
return {
"attributes": [
{"machineId": self.machine_id, "attributeId": key, "value": value}
for key, value in attributes.items()
]
}
22 changes: 15 additions & 7 deletions tulip_api/asyncio/tulip_table.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import json
from typing import Any, AsyncGenerator, Dict, Generator, Iterable, List, Union
import asyncio
from typing import Any, AsyncGenerator, Iterable, List, Union
from uuid import uuid4

from tulip_api.asyncio import TulipAPI
from tulip_api.exceptions import (
TulipAPIInvalidChunkSize,
TulipAPIMalformedRequestError,
TulipApiTableRecordCreateMustIncludeID,
)

Expand Down Expand Up @@ -223,15 +222,24 @@ async def create_records(
"""
created_records = 0
failed_records = 0
futures: List[asyncio.Task] = []
for record in records:
futures.append(
asyncio.create_task(
self.create_record(record, create_random_id=create_random_id)
)
)

for future in asyncio.as_completed(futures):
try:
await self.create_record(record, create_random_id=create_random_id)
await future
created_records += 1
except TulipAPIMalformedRequestError as exception:
except Exception as e:
failed_records += 1
print(f"There was an issue creating the record:\n{json.dumps(record)}")
print("There was an issue creating a record")
if not warn_on_failure:
raise exception
raise e

if warn_on_failure and failed_records > 0:
print(f"Failed to create {failed_records} records.")

Expand Down
88 changes: 88 additions & 0 deletions tulip_api/asyncio/tulip_table_csv_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from csv import DictReader
from typing import Any, Dict, Iterable, TextIO, Union

from dateutil import parser

from tulip_api.asyncio.tulip_table import TulipTable


class TulipTableCSVUploader:
tulip_table: TulipTable

def __init__(self, tulip_table: TulipTable, csv_file: Union[str, TextIO]):
self.tulip_table = tulip_table
self.csv_file = csv_file

async def execute(self, create_random_id=False, warn_on_failure=False) -> int:
if isinstance(self.csv_file, str):
with open(self.csv_file, "r") as csv_file:
return await self._upload_records(
csv_file,
create_random_id=create_random_id,
warn_on_failure=warn_on_failure,
)

return await self._upload_records(
self.csv_file,
create_random_id=create_random_id,
warn_on_failure=warn_on_failure,
)

async def _upload_records(
self, file: TextIO, create_random_id=False, warn_on_failure=False
):
table_columns = (await self.tulip_table.get_details())["columns"]
column_types = {
column["name"]: column["dataType"]["type"] for column in table_columns
}
reader = DictReader(file)
TulipTableCSVUploader._validate_csv_columns(reader.fieldnames, column_types)

return await self.tulip_table.create_records(
TulipTableCSVUploader._yield_coerced_records(reader, column_types),
warn_on_failure=warn_on_failure,
create_random_id=create_random_id,
)

@staticmethod
def _coerce_type(value: Any, type: str):
if type == "string":
return str(value)
if type == "integer":
if isinstance(value, str):
return int(float(value))
return int(value)
if type == "float":
return float(value)
if type == "boolean":
return bool(value)
if type == "timestamp":
return parser.parse(value, ignoretz=True).strftime("%Y-%m-%dT%H:%M:%SZ")

raise Exception("Unsupported datatype: {type}. Value: {value}")

@staticmethod
def _coerce_record_types(record: Dict[str, Any], column_types: Dict[str, str]):
new_record = {}
for column_id, value in record.items():
if column_id not in column_types:
raise Exception(
f"Column {column_id} found in record, but not in table."
)
new_record[column_id] = TulipTableCSVUploader._coerce_type(
value, column_types[column_id]
)
return new_record

@staticmethod
def _yield_coerced_records(records: Iterable, column_types: Dict[str, str]):
for record in records:
yield TulipTableCSVUploader._coerce_record_types(record, column_types)

@staticmethod
def _validate_csv_columns(csv_fieldnames, table_columns):
for csv_fieldname in csv_fieldnames:
if not csv_fieldname in table_columns:
raise Exception(
f"Column {csv_fieldname} is not found in the Tulip Table columns."
)
6 changes: 3 additions & 3 deletions tulip_api/cached_tulip_table.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Dict, List

from .exceptions import (
from tulip_api.exceptions import (
TulipAPICachedTableDuplicateIDFound,
TulipApiCachedTableRecordNotFound,
)
from .tulip_api import TulipAPI
from .tulip_table import TulipTable
from tulip_api.tulip_api import TulipAPI
from tulip_api.tulip_table import TulipTable


class CachedTulipTable:
Expand Down
2 changes: 1 addition & 1 deletion tulip_api/tulip_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import requests

from .exceptions import (
from tulip_api.exceptions import (
TulipAPIAuthorizationError,
TulipAPIInternalError,
TulipAPIMalformedRequestError,
Expand Down
2 changes: 1 addition & 1 deletion tulip_api/tulip_machine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, Dict

from .tulip_api import TulipAPI
from tulip_api.tulip_api import TulipAPI


class TulipMachine:
Expand Down
4 changes: 2 additions & 2 deletions tulip_api/tulip_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
from typing import Any, Dict, Generator, Iterable, List, Union
from uuid import uuid4

from .exceptions import (
from tulip_api.exceptions import (
TulipAPIInvalidChunkSize,
TulipAPIMalformedRequestError,
TulipApiTableRecordCreateMustIncludeID,
)
from .tulip_api import TulipAPI
from tulip_api.tulip_api import TulipAPI


class TulipTable:
Expand Down
19 changes: 14 additions & 5 deletions tulip_api/tulip_table_csv_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dateutil import parser

from .tulip_table import TulipTable
from tulip_api.tulip_table import TulipTable


class TulipTableCSVUploader:
Expand All @@ -16,12 +16,21 @@ def __init__(self, tulip_table: TulipTable, csv_file: Union[str, TextIO]):
def execute(self, create_random_id=False, warn_on_failure=False) -> int:
if isinstance(self.csv_file, str):
with open(self.csv_file, "r") as csv_file:
return self._upload_records(csv_file, create_random_id=create_random_id, warn_on_failure=warn_on_failure)

return self._upload_records(self.csv_file, create_random_id=create_random_id, warn_on_failure=warn_on_failure)
return self._upload_records(
csv_file,
create_random_id=create_random_id,
warn_on_failure=warn_on_failure,
)

return self._upload_records(
self.csv_file,
create_random_id=create_random_id,
warn_on_failure=warn_on_failure,
)

def _upload_records(self, file: TextIO, create_random_id=False, warn_on_failure=False):
def _upload_records(
self, file: TextIO, create_random_id=False, warn_on_failure=False
):
table_columns = self.tulip_table.get_details()["columns"]
column_types = {
column["name"]: column["dataType"]["type"] for column in table_columns
Expand Down
Loading

0 comments on commit e8a83d0

Please sign in to comment.