diff --git a/cognite/extractorutils/uploader/time_series.py b/cognite/extractorutils/uploader/time_series.py index 89b16e95..1b84d743 100644 --- a/cognite/extractorutils/uploader/time_series.py +++ b/cognite/extractorutils/uploader/time_series.py @@ -21,7 +21,12 @@ from requests import ConnectionError from cognite.client import CogniteClient -from cognite.client.data_classes import Sequence, SequenceData, TimeSeries +from cognite.client.data_classes import ( + Sequence, + SequenceData, + SequenceRows, + TimeSeries, +) from cognite.client.exceptions import CogniteAPIError, CogniteDuplicatedError, CogniteNotFoundError from cognite.extractorutils.uploader._base import ( RETRIES, @@ -361,7 +366,7 @@ def __init__( thread_name, cancellation_token, ) - self.upload_queue: Dict[EitherId, SequenceData] = {} + self.upload_queue: Dict[EitherId, SequenceRows] = {} self.sequence_metadata: Dict[EitherId, Dict[str, Union[str, int, float]]] = {} self.sequence_asset_external_ids: Dict[EitherId, str] = {} self.sequence_dataset_external_ids: Dict[EitherId, str] = {} @@ -435,6 +440,7 @@ def add_to_upload_queue( List[Tuple[int, Union[int, float, str]]], List[Dict[str, Any]], SequenceData, + SequenceRows, ], column_external_ids: Optional[List[dict]] = None, id: Optional[int] = None, @@ -459,19 +465,25 @@ def add_to_upload_queue( either_id = EitherId(id=id, external_id=external_id) - if isinstance(rows, SequenceData): - # Already in desired format + if isinstance(rows, SequenceRows): + # Already in the desired format pass - elif isinstance(rows, dict): - rows = [{"rowNumber": row_number, "values": values} for row_number, values in rows.items()] - - rows = SequenceData(id=id, external_id=id, rows=rows, columns=column_external_ids) # type: ignore - - elif isinstance(rows, list): - if isinstance(rows[0], tuple) or isinstance(rows[0], list): - rows = [{"rowNumber": row_number, "values": values} for row_number, values in rows] - - rows = SequenceData(id=id, external_id=id, rows=rows, columns=column_external_ids) # type: ignore + elif isinstance(rows, (dict, list)): + rows_raw: List[Dict[str, Any]] + if isinstance(rows, dict): + rows_raw = [{"rowNumber": row_number, "values": values} for row_number, values in rows.items()] + elif isinstance(rows, list) and rows and isinstance(rows[0], (tuple, list)): + rows_raw = [{"rowNumber": row_number, "values": values} for row_number, values in rows] + else: + rows_raw = rows # type: ignore[assignment] + rows = SequenceRows.load( + { + "rows": rows_raw, + "columns": column_external_ids, + "id": id, + "externalId": external_id, + } + ) else: raise TypeError("Unsupported type for sequence rows: {}".format(type(rows))) @@ -479,8 +491,7 @@ def add_to_upload_queue( seq = self.upload_queue.get(either_id) if seq is not None: # Update sequence - seq.values.extend(rows.values) # type: ignore # type is list, mypy is wrong - seq.row_numbers.extend(rows.row_numbers) # type: ignore + seq.rows.extend(rows.rows) # type: ignore[attr-defined] self.upload_queue[either_id] = seq else: @@ -585,7 +596,7 @@ def _create_or_update(self, either_id: EitherId) -> None: # Update definition of cached sequence cseq = self.upload_queue[either_id] - cseq.columns = seq.columns + cseq.columns = seq.columns # type: ignore[assignment] def _resolve_asset_ids(self) -> None: """ diff --git a/tests/tests_unit/test_cdf_upload_queues.py b/tests/tests_unit/test_cdf_upload_queues.py index d3ec6492..ffd39b68 100644 --- a/tests/tests_unit/test_cdf_upload_queues.py +++ b/tests/tests_unit/test_cdf_upload_queues.py @@ -221,11 +221,15 @@ def post(x): queue.start() queue.add_to_upload_queue( - rows=[{"rowNumber": 1, "values": {"field": "Hello"}}], column_external_ids=[], external_id="seq-1" + rows=[{"rowNumber": 1, "values": ["Hello"]}], + column_external_ids=[{"externalId": "field", "valueType": "String"}], + external_id="seq-1", ) queue.add_to_upload_queue( - rows=[{"rowNumber": 2, "values": {"field": "World"}}], column_external_ids=[], external_id="seq-1" + rows=[{"rowNumber": 2, "values": ["World"]}], + column_external_ids=[{"externalId": "field", "valueType": "String"}], + external_id="seq-1", ) time.sleep(2.1) @@ -249,11 +253,15 @@ def post(x): queue.start() queue.add_to_upload_queue( - rows=[{"rowNumber": 1, "values": {"field": "Hello"}}], column_external_ids=[], external_id="seq-1" + rows=[{"rowNumber": 1, "values": ["Hello"]}], + column_external_ids=[{"externalId": "field", "valueType": "String"}], + external_id="seq-1", ) queue.add_to_upload_queue( - rows=[{"rowNumber": 2, "values": {"field": "World"}}], column_external_ids=[], external_id="seq-2" + rows=[{"rowNumber": 2, "values": ["World"]}], + column_external_ids=[{"externalId": "field", "valueType": "String"}], + external_id="seq-2", ) time.sleep(2.1)