Skip to content

Commit

Permalink
Bugfix asset hierarchy upsert (#1368)
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt authored Sep 14, 2023
1 parent 9794839 commit 9d88fd9
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 29 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [6.24.1] - 2023-09-13
### Fixed
- Bugfix for `AssetsAPI.create_hierarchy` when running in upsert mode: It could skip certain updates above
the single-request create limit (currently 1000 assets).

## [6.24.0] - 2023-09-12
### Fixed
- Bugfix for `FilesAPI.upload` and `FilesAPI.upload_bytes` not raising an error on file contents upload failure. Now `CogniteFileUploadError` is raised based on upload response.
Expand Down
58 changes: 33 additions & 25 deletions cognite/client/_api/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import itertools
import math
import operator as op
import threading
import warnings
from functools import cached_property
from types import MappingProxyType
Expand Down Expand Up @@ -703,6 +704,9 @@ def create_hierarchy(
... else:
... hierarchy.validate_and_report(output_file=Path("report.txt"))
"""
if upsert and upsert_mode not in ("patch", "replace"):
raise ValueError(f"'upsert_mode' must be either 'patch' or 'replace', not {upsert_mode!r}")

if not isinstance(assets, AssetHierarchy):
utils._auxiliary.assert_type(assets, "assets", [Sequence])
assets = AssetHierarchy(assets, ignore_orphans=True)
Expand Down Expand Up @@ -1157,7 +1161,8 @@ def __init__(self, hierarchy: AssetHierarchy, assets_api: AssetsAPI) -> None:
self.max_workers = assets_api._config.max_workers
self.failed: list[Asset] = []
self.unknown: list[Asset] = []
self.latest_exception: Exception | None = None
# Each thread needs to store its latest exception:
self.latest_exception: dict[int, Exception | None] = {}

self.__counter = itertools.count().__next__

Expand All @@ -1167,15 +1172,19 @@ def create(self, upsert: bool, upsert_mode: Literal["patch", "replace"]) -> Asse
subtree_count = self.hierarchy.count_subtree(insert_dct)

with get_priority_executor(max_workers=self.max_workers) as pool:
return self._create(pool, insert_fn, insert_dct, subtree_count)
created_assets = self._create(pool, insert_fn, insert_dct, subtree_count)

if all_exceptions := [exc for exc in self.latest_exception.values() if exc is not None]:
self._raise_latest_exception(all_exceptions, created_assets)
return AssetList(created_assets, cognite_client=self.assets_api._cognite_client)

def _create(
self,
pool: PriorityThreadPoolExecutor,
insert_fn: Callable[[list[Asset]], _TaskResult],
insert_dct: dict[str | None, list[Asset]],
subtree_count: dict[str, int],
) -> AssetList:
) -> list[Asset]:
queue_fn = functools.partial(
self._queue_tasks,
pool=pool,
Expand All @@ -1202,10 +1211,7 @@ def _create(
# Newly created assets are now unblocked as parents for the next iteration:
to_create = list(self._pop_child_assets(new_assets, insert_dct))
futures |= queue_fn(to_create)

if self.latest_exception is not None:
self._raise_latest_exception(created_assets)
return AssetList(created_assets, cognite_client=self.assets_api._cognite_client)
return created_assets

def _queue_tasks(
self,
Expand Down Expand Up @@ -1236,7 +1242,7 @@ def _insert(
successful = list(map(Asset._load, resp.json()["items"]))
return _TaskResult(successful, failed=[], unknown=[])
except Exception as err:
self.latest_exception = err
self._set_latest_exception(err)
successful = []
failed: list[Asset] = []
unknown: list[Asset] = []
Expand Down Expand Up @@ -1266,31 +1272,28 @@ def _insert(
# If update went well: Add to list of successful assets and remove from "bad":
if updated is not None:
successful.extend(updated)
still_bad = set(bad_assets).difference(updated)
updated_xids = set(upd.external_id for upd in updated)
still_bad = [bad for bad in bad_assets if bad.external_id not in updated_xids]
bad_assets.clear()
bad_assets.extend(still_bad)

return _TaskResult(successful, failed, unknown)

def _update(self, to_update: list[Asset], upsert_mode: Literal["patch", "replace"]) -> list[Asset] | None:
if upsert_mode == "patch":
updates = [self._make_asset_updates(asset, patch=True) for asset in to_update]
elif upsert_mode == "replace":
updates = [self._make_asset_updates(asset, patch=False) for asset in to_update]
else:
raise ValueError(f"'upsert_mode' must be either 'patch' or 'replace', not {upsert_mode!r}")
is_patch = upsert_mode == "patch"
updates = [self._make_asset_updates(asset, patch=is_patch) for asset in to_update]
return self._update_post(updates)

def _update_post(self, items: list[AssetUpdate]) -> list[Asset] | None:
try:
resp = self.assets_api._post(self.resource_path + "/update", json=self._dump_assets(items))
updated = [Asset._load(item) for item in resp.json()["items"]]
self.latest_exception = None # Update worked, so we hide exception
self._set_latest_exception(None) # Update worked, so we hide exception
return updated
except Exception as err:
# At this point, we don't care what caused the failure (well, we store error to show the user):
# All assets that failed the update are already marked as either failed or unknown.
self.latest_exception = err
self._set_latest_exception(err)
return None

def _make_asset_updates(self, asset: Asset, patch: bool) -> AssetUpdate:
Expand All @@ -1311,6 +1314,10 @@ def _make_asset_updates(self, asset: Asset, patch: bool) -> AssetUpdate:
upd._update_object = dct_update
return upd

def _set_latest_exception(self, err: Exception | None) -> None:
thread_id = threading.get_ident()
self.latest_exception[thread_id] = err

@cached_property
def clear_all_update(self) -> MappingProxyType[str, dict[str, Any]]:
props = {to_camel_case(prop.name) for prop in AssetUpdate._get_update_properties()}
Expand Down Expand Up @@ -1402,27 +1409,28 @@ def _skip_all_descendants(
skip_assets = list(self._pop_child_assets(skip_assets, insert_dct))
self.failed.extend(skip_assets)

def _raise_latest_exception(self, successful: list[Asset]) -> NoReturn:
def _raise_latest_exception(self, exceptions: list[Exception], successful: list[Asset]) -> NoReturn:
*_, latest_exception = exceptions
common = dict(
successful=AssetList(successful),
unknown=AssetList(self.unknown),
failed=AssetList(self.failed),
unwrap_fn=op.attrgetter("external_id"),
)
err_message = "One or more errors happened during asset creation. Latest error:"
if isinstance(self.latest_exception, CogniteAPIError):
if isinstance(latest_exception, CogniteAPIError):
raise CogniteAPIError(
message=f"{err_message} {self.latest_exception.message}",
x_request_id=self.latest_exception.x_request_id,
code=self.latest_exception.code,
extra=self.latest_exception.extra,
message=f"{err_message} {latest_exception.message}",
x_request_id=latest_exception.x_request_id,
code=latest_exception.code,
extra=latest_exception.extra,
**common, # type: ignore [arg-type]
)
# If a non-Cognite-exception was raised, we still raise CogniteAPIError, but use 'from' to not hide
# the underlying reason from the user. We also do this because we promise that 'successful', 'unknown'
# and 'failed' can be inspected:
raise CogniteAPIError(
message=f"{err_message} {type(self.latest_exception).__name__}('{self.latest_exception}')",
message=f"{err_message} {type(latest_exception).__name__}('{latest_exception}')",
code=None, # type: ignore [arg-type]
**common, # type: ignore [arg-type]
) from self.latest_exception
) from latest_exception
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "6.24.0"
__version__ = "6.24.1"
__api_subversion__ = "V20220125"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "6.24.0"
version = "6.24.1"

description = "Cognite Python SDK"
readme = "README.md"
Expand Down
20 changes: 18 additions & 2 deletions tests/tests_integration/test_api/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,10 @@ def set_create_lim(cognite_client):
yield


@pytest.mark.usefixtures("set_create_lim")
class TestAssetsAPICreateHierarchy:
@pytest.mark.parametrize("n_roots", (0, 1, 4))
def test_variable_number_of_root_assets(self, cognite_client, n_roots, root_test_asset, set_create_lim):
def test_variable_number_of_root_assets(self, cognite_client, n_roots, root_test_asset):
s = random_string(10)
assets = []
for i in range(n_roots):
Expand All @@ -451,7 +452,7 @@ def test_variable_number_of_root_assets(self, cognite_client, n_roots, root_test
),
)
def test_orphans__parent_linked_using_mixed_ids_xids(
self, n_id, n_xid, pass_hierarchy, cognite_client, root_test_asset_subtree, set_create_lim
self, n_id, n_xid, pass_hierarchy, cognite_client, root_test_asset_subtree
):
assets = generate_orphan_assets(n_id, n_xid, sample_from=root_test_asset_subtree)
expected = set(AssetList(assets)._external_id_to_item)
Expand Down Expand Up @@ -526,6 +527,21 @@ def test_upsert_mode_false_doesnt_patch(self, cognite_client):
pytest.fail("Expected 409 API error: 'Asset id duplicated'")
assert err.value.code == 409

def test_upsert_mode__only_first_batch_is_updated(self, cognite_client):
# SDK 6.24.0 and earlier versions had a bug when using upsert that could lead to only the first
# _CREATE_LIMIT number of assets (updated) being returned.
assets = AssetList(create_asset_tower(10))
expected_xids = set(assets.as_external_ids())
created = cognite_client.assets.create_hierarchy(assets, upsert=False)
assert set(created.as_external_ids()) == expected_xids

for a in assets:
a.description = "updated <3"

with create_hierarchy_with_cleanup(cognite_client, assets, upsert=True, upsert_mode="patch") as updated:
assert set(updated.as_external_ids()) == expected_xids
assert all(upd.description == "updated <3" for upd in updated)

def test_upsert_mode_with_replace(self, cognite_client):
assets = create_asset_tower(5)
created = cognite_client.assets.create_hierarchy(assets, upsert=False)
Expand Down

0 comments on commit 9d88fd9

Please sign in to comment.