From 9d88fd9deaea1bf73b671fa03cff14b6ec77afc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Thu, 14 Sep 2023 20:31:00 +0200 Subject: [PATCH] Bugfix asset hierarchy upsert (#1368) --- CHANGELOG.md | 5 ++ cognite/client/_api/assets.py | 58 +++++++++++-------- cognite/client/_version.py | 2 +- pyproject.toml | 2 +- .../tests_integration/test_api/test_assets.py | 20 ++++++- 5 files changed, 58 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 905e351980..822a0ce541 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [6.24.1] - 2023-09-13 +### Fixed +- Bugfix for `AssetsAPI.create_hierarchy` when running in upsert mode: It could skip certain updates above + the single-request create limit (currently 1000 assets). + ## [6.24.0] - 2023-09-12 ### Fixed - Bugfix for `FilesAPI.upload` and `FilesAPI.upload_bytes` not raising an error on file contents upload failure. Now `CogniteFileUploadError` is raised based on upload response. diff --git a/cognite/client/_api/assets.py b/cognite/client/_api/assets.py index 6363247f89..cec0351f63 100644 --- a/cognite/client/_api/assets.py +++ b/cognite/client/_api/assets.py @@ -5,6 +5,7 @@ import itertools import math import operator as op +import threading import warnings from functools import cached_property from types import MappingProxyType @@ -703,6 +704,9 @@ def create_hierarchy( ... else: ... hierarchy.validate_and_report(output_file=Path("report.txt")) """ + if upsert and upsert_mode not in ("patch", "replace"): + raise ValueError(f"'upsert_mode' must be either 'patch' or 'replace', not {upsert_mode!r}") + if not isinstance(assets, AssetHierarchy): utils._auxiliary.assert_type(assets, "assets", [Sequence]) assets = AssetHierarchy(assets, ignore_orphans=True) @@ -1157,7 +1161,8 @@ def __init__(self, hierarchy: AssetHierarchy, assets_api: AssetsAPI) -> None: self.max_workers = assets_api._config.max_workers self.failed: list[Asset] = [] self.unknown: list[Asset] = [] - self.latest_exception: Exception | None = None + # Each thread needs to store its latest exception: + self.latest_exception: dict[int, Exception | None] = {} self.__counter = itertools.count().__next__ @@ -1167,7 +1172,11 @@ def create(self, upsert: bool, upsert_mode: Literal["patch", "replace"]) -> Asse subtree_count = self.hierarchy.count_subtree(insert_dct) with get_priority_executor(max_workers=self.max_workers) as pool: - return self._create(pool, insert_fn, insert_dct, subtree_count) + created_assets = self._create(pool, insert_fn, insert_dct, subtree_count) + + if all_exceptions := [exc for exc in self.latest_exception.values() if exc is not None]: + self._raise_latest_exception(all_exceptions, created_assets) + return AssetList(created_assets, cognite_client=self.assets_api._cognite_client) def _create( self, @@ -1175,7 +1184,7 @@ def _create( insert_fn: Callable[[list[Asset]], _TaskResult], insert_dct: dict[str | None, list[Asset]], subtree_count: dict[str, int], - ) -> AssetList: + ) -> list[Asset]: queue_fn = functools.partial( self._queue_tasks, pool=pool, @@ -1202,10 +1211,7 @@ def _create( # Newly created assets are now unblocked as parents for the next iteration: to_create = list(self._pop_child_assets(new_assets, insert_dct)) futures |= queue_fn(to_create) - - if self.latest_exception is not None: - self._raise_latest_exception(created_assets) - return AssetList(created_assets, cognite_client=self.assets_api._cognite_client) + return created_assets def _queue_tasks( self, @@ -1236,7 +1242,7 @@ def _insert( successful = list(map(Asset._load, resp.json()["items"])) return _TaskResult(successful, failed=[], unknown=[]) except Exception as err: - self.latest_exception = err + self._set_latest_exception(err) successful = [] failed: list[Asset] = [] unknown: list[Asset] = [] @@ -1266,31 +1272,28 @@ def _insert( # If update went well: Add to list of successful assets and remove from "bad": if updated is not None: successful.extend(updated) - still_bad = set(bad_assets).difference(updated) + updated_xids = set(upd.external_id for upd in updated) + still_bad = [bad for bad in bad_assets if bad.external_id not in updated_xids] bad_assets.clear() bad_assets.extend(still_bad) return _TaskResult(successful, failed, unknown) def _update(self, to_update: list[Asset], upsert_mode: Literal["patch", "replace"]) -> list[Asset] | None: - if upsert_mode == "patch": - updates = [self._make_asset_updates(asset, patch=True) for asset in to_update] - elif upsert_mode == "replace": - updates = [self._make_asset_updates(asset, patch=False) for asset in to_update] - else: - raise ValueError(f"'upsert_mode' must be either 'patch' or 'replace', not {upsert_mode!r}") + is_patch = upsert_mode == "patch" + updates = [self._make_asset_updates(asset, patch=is_patch) for asset in to_update] return self._update_post(updates) def _update_post(self, items: list[AssetUpdate]) -> list[Asset] | None: try: resp = self.assets_api._post(self.resource_path + "/update", json=self._dump_assets(items)) updated = [Asset._load(item) for item in resp.json()["items"]] - self.latest_exception = None # Update worked, so we hide exception + self._set_latest_exception(None) # Update worked, so we hide exception return updated except Exception as err: # At this point, we don't care what caused the failure (well, we store error to show the user): # All assets that failed the update are already marked as either failed or unknown. - self.latest_exception = err + self._set_latest_exception(err) return None def _make_asset_updates(self, asset: Asset, patch: bool) -> AssetUpdate: @@ -1311,6 +1314,10 @@ def _make_asset_updates(self, asset: Asset, patch: bool) -> AssetUpdate: upd._update_object = dct_update return upd + def _set_latest_exception(self, err: Exception | None) -> None: + thread_id = threading.get_ident() + self.latest_exception[thread_id] = err + @cached_property def clear_all_update(self) -> MappingProxyType[str, dict[str, Any]]: props = {to_camel_case(prop.name) for prop in AssetUpdate._get_update_properties()} @@ -1402,7 +1409,8 @@ def _skip_all_descendants( skip_assets = list(self._pop_child_assets(skip_assets, insert_dct)) self.failed.extend(skip_assets) - def _raise_latest_exception(self, successful: list[Asset]) -> NoReturn: + def _raise_latest_exception(self, exceptions: list[Exception], successful: list[Asset]) -> NoReturn: + *_, latest_exception = exceptions common = dict( successful=AssetList(successful), unknown=AssetList(self.unknown), @@ -1410,19 +1418,19 @@ def _raise_latest_exception(self, successful: list[Asset]) -> NoReturn: unwrap_fn=op.attrgetter("external_id"), ) err_message = "One or more errors happened during asset creation. Latest error:" - if isinstance(self.latest_exception, CogniteAPIError): + if isinstance(latest_exception, CogniteAPIError): raise CogniteAPIError( - message=f"{err_message} {self.latest_exception.message}", - x_request_id=self.latest_exception.x_request_id, - code=self.latest_exception.code, - extra=self.latest_exception.extra, + message=f"{err_message} {latest_exception.message}", + x_request_id=latest_exception.x_request_id, + code=latest_exception.code, + extra=latest_exception.extra, **common, # type: ignore [arg-type] ) # If a non-Cognite-exception was raised, we still raise CogniteAPIError, but use 'from' to not hide # the underlying reason from the user. We also do this because we promise that 'successful', 'unknown' # and 'failed' can be inspected: raise CogniteAPIError( - message=f"{err_message} {type(self.latest_exception).__name__}('{self.latest_exception}')", + message=f"{err_message} {type(latest_exception).__name__}('{latest_exception}')", code=None, # type: ignore [arg-type] **common, # type: ignore [arg-type] - ) from self.latest_exception + ) from latest_exception diff --git a/cognite/client/_version.py b/cognite/client/_version.py index f04e4315fd..dde3c4639b 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "6.24.0" +__version__ = "6.24.1" __api_subversion__ = "V20220125" diff --git a/pyproject.toml b/pyproject.toml index 6942844935..aa82a96b0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "6.24.0" +version = "6.24.1" description = "Cognite Python SDK" readme = "README.md" diff --git a/tests/tests_integration/test_api/test_assets.py b/tests/tests_integration/test_api/test_assets.py index 732b125dee..9de9306b00 100644 --- a/tests/tests_integration/test_api/test_assets.py +++ b/tests/tests_integration/test_api/test_assets.py @@ -422,9 +422,10 @@ def set_create_lim(cognite_client): yield +@pytest.mark.usefixtures("set_create_lim") class TestAssetsAPICreateHierarchy: @pytest.mark.parametrize("n_roots", (0, 1, 4)) - def test_variable_number_of_root_assets(self, cognite_client, n_roots, root_test_asset, set_create_lim): + def test_variable_number_of_root_assets(self, cognite_client, n_roots, root_test_asset): s = random_string(10) assets = [] for i in range(n_roots): @@ -451,7 +452,7 @@ def test_variable_number_of_root_assets(self, cognite_client, n_roots, root_test ), ) def test_orphans__parent_linked_using_mixed_ids_xids( - self, n_id, n_xid, pass_hierarchy, cognite_client, root_test_asset_subtree, set_create_lim + self, n_id, n_xid, pass_hierarchy, cognite_client, root_test_asset_subtree ): assets = generate_orphan_assets(n_id, n_xid, sample_from=root_test_asset_subtree) expected = set(AssetList(assets)._external_id_to_item) @@ -526,6 +527,21 @@ def test_upsert_mode_false_doesnt_patch(self, cognite_client): pytest.fail("Expected 409 API error: 'Asset id duplicated'") assert err.value.code == 409 + def test_upsert_mode__only_first_batch_is_updated(self, cognite_client): + # SDK 6.24.0 and earlier versions had a bug when using upsert that could lead to only the first + # _CREATE_LIMIT number of assets (updated) being returned. + assets = AssetList(create_asset_tower(10)) + expected_xids = set(assets.as_external_ids()) + created = cognite_client.assets.create_hierarchy(assets, upsert=False) + assert set(created.as_external_ids()) == expected_xids + + for a in assets: + a.description = "updated <3" + + with create_hierarchy_with_cleanup(cognite_client, assets, upsert=True, upsert_mode="patch") as updated: + assert set(updated.as_external_ids()) == expected_xids + assert all(upd.description == "updated <3" for upd in updated) + def test_upsert_mode_with_replace(self, cognite_client): assets = create_asset_tower(5) created = cognite_client.assets.create_hierarchy(assets, upsert=False)