Skip to content

Commit

Permalink
Add option to create missing time series from upload queue (#65)
Browse files Browse the repository at this point in the history
This will create the missing time series whenever external ids are
used.

Also fixes an issue in EitherId where the repr method didn't return a
string (as expected).
  • Loading branch information
mathialo authored Aug 27, 2020
1 parent 4494216 commit 08b8e4f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 10 deletions.
33 changes: 29 additions & 4 deletions cognite/extractorutils/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from retry import retry

from cognite.client import CogniteClient
from cognite.client.data_classes import Event
from cognite.client.data_classes import Event, TimeSeries
from cognite.client.data_classes.raw import Row
from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError

Expand Down Expand Up @@ -310,6 +310,7 @@ class TimeSeriesUploadQueue(AbstractUploadQueue):
methods).
trigger_log_level: Log level to log upload triggers to.
thread_name: Thread name of uploader thread.
create_missing: Create missing time series if possible (ie, if external id is used)
"""

def __init__(
Expand All @@ -320,11 +321,13 @@ def __init__(
max_upload_interval: Optional[int] = None,
trigger_log_level: str = "DEBUG",
thread_name: Optional[str] = None,
create_missing: bool = False,
):
# Super sets post_upload and threshold
super().__init__(
cdf_client, post_upload_function, max_queue_size, max_upload_interval, trigger_log_level, thread_name
)
self.create_missing = create_missing

self.upload_queue: Dict[EitherId, DataPointList] = dict()

Expand Down Expand Up @@ -380,19 +383,40 @@ def upload(self) -> None:
max_delay=RETRY_MAX_DELAY,
backoff=RETRY_BACKOFF_FACTOR,
)
def _upload_batch(self, upload_this: List[Dict]) -> List[Dict]:
def _upload_batch(self, upload_this: List[Dict], retries=5) -> List[Dict]:
if len(upload_this) == 0:
return upload_this

try:
self.cdf_client.datapoints.insert_multiple(upload_this)

except CogniteNotFoundError as ex:
self.logger.error("Could not upload data points to %s: %s", str(ex.not_found), str(ex))
if not retries:
raise ex

if not self.create_missing:
self.logger.error("Could not upload data points to %s: %s", str(ex.not_found), str(ex))

# Get IDs of time series that exists, but failed because of the non-existing time series
retry_these = [EitherId(**id_dict) for id_dict in ex.failed if id_dict not in ex.not_found]

if self.create_missing:
# Get the time series that can be created
create_these = [id_dict["externalId"] for id_dict in ex.not_found if "externalId" in id_dict]

self.logger.info(f"Creating {len(create_these)} time series")
self.cdf_client.time_series.create([TimeSeries(external_id=i) for i in create_these])

retry_these.extend([EitherId(external_id=i) for i in create_these])

if len(ex.not_found) != len(create_these):
missing = [id_dict for id_dict in ex.not_found if id_dict.get("externalId") not in retry_these]
self.logger.error(
f"{len(ex.not_found) - len(create_these)} time series not found, and could not be created automatically:\n"
+ str(missing)
+ "\nData will be dropped"
)

# Remove entries with non-existing time series from upload queue
upload_this = [
entry
Expand All @@ -401,7 +425,8 @@ def _upload_batch(self, upload_this: List[Dict]) -> List[Dict]:
]

# Upload remaining
self.cdf_client.datapoints.insert_multiple(upload_this)
self._upload_batch(upload_this, retries - 1)

return upload_this

def __enter__(self) -> "TimeSeriesUploadQueue":
Expand Down
8 changes: 4 additions & 4 deletions cognite/extractorutils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ def __str__(self) -> str:
"""
return "{}: {}".format(self.type(), self.content())

def __repr__(self) -> Dict[str, Union[str, int]]:
def __repr__(self) -> str:
"""
Returns a dict containing the ID type as key and ID as value
Get a string representation of the EitherId on the format "type: content".
Returns:
A dictionary representation of the EitherId
A string rep of the EitherId
"""
return {self.type(): self.content()}
return self.__str__()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-extractor-utils"
version = "1.1.0"
version = "1.1.1"
description = "Utilities for easier development of extractors for CDF"
authors = ["Mathias Lohne <[email protected]>"]
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion tests/tests_unit/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,4 @@ def test_hash(self):
self.assertTrue(hash(id1) == hash(id2))

def test_repr(self):
self.assertDictEqual(EitherId(externalId="extId").__repr__(), {"externalId": "extId"})
self.assertEqual(EitherId(externalId="extId").__repr__(), "externalId: extId")

0 comments on commit 08b8e4f

Please sign in to comment.