Skip to content

Commit

Permalink
Cognite-SDK v7 (#262)
Browse files Browse the repository at this point in the history
* build: Set cognite-sdk verson to 7 pre-release

* refactor: Support v7 SDK

* build: changelog

* style

* Update pyproject.toml

Co-authored-by: Håkon V. Treider <[email protected]>

* build: bump version
  • Loading branch information
doctrino authored Nov 21, 2023
1 parent eeed8fa commit 4833a3c
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 24 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [6.0.0]

### Changed

* `cognite-sdk` to `v7`

## [5.5.1]

### Added
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "5.5.1"
__version__ = "6.0.0"
from .base import Extractor
45 changes: 28 additions & 17 deletions cognite/extractorutils/uploader/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
from requests import ConnectionError

from cognite.client import CogniteClient
from cognite.client.data_classes import Sequence, SequenceData, TimeSeries
from cognite.client.data_classes import (
Sequence,
SequenceData,
SequenceRows,
TimeSeries,
)
from cognite.client.exceptions import CogniteAPIError, CogniteDuplicatedError, CogniteNotFoundError
from cognite.extractorutils.uploader._base import (
RETRIES,
Expand Down Expand Up @@ -361,7 +366,7 @@ def __init__(
thread_name,
cancellation_token,
)
self.upload_queue: Dict[EitherId, SequenceData] = {}
self.upload_queue: Dict[EitherId, SequenceRows] = {}
self.sequence_metadata: Dict[EitherId, Dict[str, Union[str, int, float]]] = {}
self.sequence_asset_external_ids: Dict[EitherId, str] = {}
self.sequence_dataset_external_ids: Dict[EitherId, str] = {}
Expand Down Expand Up @@ -435,6 +440,7 @@ def add_to_upload_queue(
List[Tuple[int, Union[int, float, str]]],
List[Dict[str, Any]],
SequenceData,
SequenceRows,
],
column_external_ids: Optional[List[dict]] = None,
id: Optional[int] = None,
Expand All @@ -459,28 +465,33 @@ def add_to_upload_queue(

either_id = EitherId(id=id, external_id=external_id)

if isinstance(rows, SequenceData):
# Already in desired format
if isinstance(rows, SequenceRows):
# Already in the desired format
pass
elif isinstance(rows, dict):
rows = [{"rowNumber": row_number, "values": values} for row_number, values in rows.items()]

rows = SequenceData(id=id, external_id=id, rows=rows, columns=column_external_ids) # type: ignore

elif isinstance(rows, list):
if isinstance(rows[0], tuple) or isinstance(rows[0], list):
rows = [{"rowNumber": row_number, "values": values} for row_number, values in rows]

rows = SequenceData(id=id, external_id=id, rows=rows, columns=column_external_ids) # type: ignore
elif isinstance(rows, (dict, list)):
rows_raw: List[Dict[str, Any]]
if isinstance(rows, dict):
rows_raw = [{"rowNumber": row_number, "values": values} for row_number, values in rows.items()]
elif isinstance(rows, list) and rows and isinstance(rows[0], (tuple, list)):
rows_raw = [{"rowNumber": row_number, "values": values} for row_number, values in rows]
else:
rows_raw = rows # type: ignore[assignment]
rows = SequenceRows.load(
{
"rows": rows_raw,
"columns": column_external_ids,
"id": id,
"externalId": external_id,
}
)
else:
raise TypeError("Unsupported type for sequence rows: {}".format(type(rows)))

with self.lock:
seq = self.upload_queue.get(either_id)
if seq is not None:
# Update sequence
seq.values.extend(rows.values) # type: ignore # type is list, mypy is wrong
seq.row_numbers.extend(rows.row_numbers) # type: ignore
seq.rows.extend(rows.rows) # type: ignore[attr-defined]

self.upload_queue[either_id] = seq
else:
Expand Down Expand Up @@ -585,7 +596,7 @@ def _create_or_update(self, either_id: EitherId) -> None:

# Update definition of cached sequence
cseq = self.upload_queue[either_id]
cseq.columns = seq.columns
cseq.columns = seq.columns # type: ignore[assignment]

def _resolve_asset_ids(self) -> None:
"""
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-extractor-utils"
version = "5.5.1"
version = "6.0.0"
description = "Utilities for easier development of extractors for CDF"
authors = ["Mathias Lohne <[email protected]>"]
license = "Apache-2.0"
Expand Down Expand Up @@ -51,7 +51,7 @@ exclude = "tests/*"

[tool.poetry.dependencies]
python = "^3.8.0"
cognite-sdk = ">=6.0, <7"
cognite-sdk = "^7"
prometheus-client = ">0.7.0, <=1.0.0"
arrow = "^1.0.0"
pyyaml = ">=5.3.0, <7"
Expand Down
16 changes: 12 additions & 4 deletions tests/tests_unit/test_cdf_upload_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,15 @@ def post(x):
queue.start()

queue.add_to_upload_queue(
rows=[{"rowNumber": 1, "values": {"field": "Hello"}}], column_external_ids=[], external_id="seq-1"
rows=[{"rowNumber": 1, "values": ["Hello"]}],
column_external_ids=[{"externalId": "field", "valueType": "String"}],
external_id="seq-1",
)

queue.add_to_upload_queue(
rows=[{"rowNumber": 2, "values": {"field": "World"}}], column_external_ids=[], external_id="seq-1"
rows=[{"rowNumber": 2, "values": ["World"]}],
column_external_ids=[{"externalId": "field", "valueType": "String"}],
external_id="seq-1",
)

time.sleep(2.1)
Expand All @@ -249,11 +253,15 @@ def post(x):
queue.start()

queue.add_to_upload_queue(
rows=[{"rowNumber": 1, "values": {"field": "Hello"}}], column_external_ids=[], external_id="seq-1"
rows=[{"rowNumber": 1, "values": ["Hello"]}],
column_external_ids=[{"externalId": "field", "valueType": "String"}],
external_id="seq-1",
)

queue.add_to_upload_queue(
rows=[{"rowNumber": 2, "values": {"field": "World"}}], column_external_ids=[], external_id="seq-2"
rows=[{"rowNumber": 2, "values": ["World"]}],
column_external_ids=[{"externalId": "field", "valueType": "String"}],
external_id="seq-2",
)

time.sleep(2.1)
Expand Down

0 comments on commit 4833a3c

Please sign in to comment.