Skip to content

Commit

Permalink
refactor: Support v7 SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino committed Nov 13, 2023
1 parent 8a3ea60 commit 7551c5b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 21 deletions.
45 changes: 28 additions & 17 deletions cognite/extractorutils/uploader/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
from requests import ConnectionError

from cognite.client import CogniteClient
from cognite.client.data_classes import Sequence, SequenceData, TimeSeries
from cognite.client.data_classes import (
Sequence,
SequenceData,
SequenceRows,
TimeSeries,
)
from cognite.client.exceptions import CogniteAPIError, CogniteDuplicatedError, CogniteNotFoundError
from cognite.extractorutils.uploader._base import (
RETRIES,
Expand Down Expand Up @@ -361,7 +366,7 @@ def __init__(
thread_name,
cancellation_token,
)
self.upload_queue: Dict[EitherId, SequenceData] = {}
self.upload_queue: Dict[EitherId, SequenceRows] = {}
self.sequence_metadata: Dict[EitherId, Dict[str, Union[str, int, float]]] = {}
self.sequence_asset_external_ids: Dict[EitherId, str] = {}
self.sequence_dataset_external_ids: Dict[EitherId, str] = {}
Expand Down Expand Up @@ -435,6 +440,7 @@ def add_to_upload_queue(
List[Tuple[int, Union[int, float, str]]],
List[Dict[str, Any]],
SequenceData,
SequenceRows,
],
column_external_ids: Optional[List[dict]] = None,
id: Optional[int] = None,
Expand All @@ -459,28 +465,33 @@ def add_to_upload_queue(

either_id = EitherId(id=id, external_id=external_id)

if isinstance(rows, SequenceData):
# Already in desired format
if isinstance(rows, SequenceRows):
# Already in the desired format
pass
elif isinstance(rows, dict):
rows = [{"rowNumber": row_number, "values": values} for row_number, values in rows.items()]

rows = SequenceData(id=id, external_id=id, rows=rows, columns=column_external_ids) # type: ignore

elif isinstance(rows, list):
if isinstance(rows[0], tuple) or isinstance(rows[0], list):
rows = [{"rowNumber": row_number, "values": values} for row_number, values in rows]

rows = SequenceData(id=id, external_id=id, rows=rows, columns=column_external_ids) # type: ignore
elif isinstance(rows, (dict, list)):
rows_raw: List[Dict[str, Any]]
if isinstance(rows, dict):
rows_raw = [{"rowNumber": row_number, "values": values} for row_number, values in rows.items()]
elif isinstance(rows, list) and rows and isinstance(rows[0], (tuple, list)):
rows_raw = [{"rowNumber": row_number, "values": values} for row_number, values in rows]
else:
rows_raw = rows # type: ignore[assignment]
rows = SequenceRows.load(
{
"rows": rows_raw,
"columns": column_external_ids,
"id": id,
"externalId": external_id,
}
)
else:
raise TypeError("Unsupported type for sequence rows: {}".format(type(rows)))

with self.lock:
seq = self.upload_queue.get(either_id)
if seq is not None:
# Update sequence
seq.values.extend(rows.values) # type: ignore # type is list, mypy is wrong
seq.row_numbers.extend(rows.row_numbers) # type: ignore
seq.rows.extend(rows.rows) # type: ignore[attr-defined]

self.upload_queue[either_id] = seq
else:
Expand Down Expand Up @@ -585,7 +596,7 @@ def _create_or_update(self, either_id: EitherId) -> None:

# Update definition of cached sequence
cseq = self.upload_queue[either_id]
cseq.columns = seq.columns
cseq.columns = seq.columns # type: ignore[assignment]

def _resolve_asset_ids(self) -> None:
"""
Expand Down
16 changes: 12 additions & 4 deletions tests/tests_unit/test_cdf_upload_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,15 @@ def post(x):
queue.start()

queue.add_to_upload_queue(
rows=[{"rowNumber": 1, "values": {"field": "Hello"}}], column_external_ids=[], external_id="seq-1"
rows=[{"rowNumber": 1, "values": ["Hello"]}],
column_external_ids=[{"externalId": "field", "valueType": "String"}],
external_id="seq-1",
)

queue.add_to_upload_queue(
rows=[{"rowNumber": 2, "values": {"field": "World"}}], column_external_ids=[], external_id="seq-1"
rows=[{"rowNumber": 2, "values": ["World"]}],
column_external_ids=[{"externalId": "field", "valueType": "String"}],
external_id="seq-1",
)

time.sleep(2.1)
Expand All @@ -249,11 +253,15 @@ def post(x):
queue.start()

queue.add_to_upload_queue(
rows=[{"rowNumber": 1, "values": {"field": "Hello"}}], column_external_ids=[], external_id="seq-1"
rows=[{"rowNumber": 1, "values": ["Hello"]}],
column_external_ids=[{"externalId": "field", "valueType": "String"}],
external_id="seq-1",
)

queue.add_to_upload_queue(
rows=[{"rowNumber": 2, "values": {"field": "World"}}], column_external_ids=[], external_id="seq-2"
rows=[{"rowNumber": 2, "values": ["World"]}],
column_external_ids=[{"externalId": "field", "valueType": "String"}],
external_id="seq-2",
)

time.sleep(2.1)
Expand Down

0 comments on commit 7551c5b

Please sign in to comment.