Skip to content

Commit

Permalink
Improve aggregated properties for AssetsAPI (#1558)
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt authored Dec 21, 2023
1 parent 35eeac5 commit 557248f
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 26 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.8.0] - 2023-12-20
## [7.8.1] - 2023-12-21
### Fixed
- Calling `to_pandas` with `expand_aggregates=True` on an Asset with aggregated properties would yield a pandas DataFrame
with the column name `0` instead of `"value"`.
### Improved
- Specification of aggregated properties to `AssetsAPI.[list,filter,__call__]`.

## [7.8.0] - 2023-12-21
### Added
- Instance classes `Node`, `Edge`, `NodeList` and `EdgeList` now supports a new flag `expand_properties` in their `to_pandas` method,
that makes it much simpler to work with the fetched properties. Additionally, `remove_property_prefix` allows easy prefix
Expand Down
41 changes: 20 additions & 21 deletions cognite/client/_api/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@

as_completed = import_as_completed()

AggregateAssetProperty: TypeAlias = Literal["child_count", "path", "depth"]

SortSpec: TypeAlias = Union[
AssetSort,
str,
Expand Down Expand Up @@ -110,7 +112,7 @@ def __call__(
last_updated_time: TimestampRange | dict[str, Any] | None = None,
root: bool | None = None,
external_id_prefix: str | None = None,
aggregated_properties: Sequence[str] | None = None,
aggregated_properties: Sequence[AggregateAssetProperty] | None = None,
limit: int | None = None,
partitions: int | None = None,
) -> Iterator[Asset] | Iterator[AssetList]:
Expand All @@ -135,16 +137,14 @@ def __call__(
last_updated_time (TimestampRange | dict[str, Any] | None): Range between two timestamps. Possible keys are `min` and `max`, with values given as time stamps in ms.
root (bool | None): filtered assets are root assets or not
external_id_prefix (str | None): Filter by this (case-sensitive) prefix for the external ID.
aggregated_properties (Sequence[str] | None): Set of aggregated properties to include.
aggregated_properties (Sequence[AggregateAssetProperty] | None): Set of aggregated properties to include. Options are childCount, path, depth.
limit (int | None): Maximum number of assets to return. Defaults to return all items.
partitions (int | None): Retrieve assets in parallel using this number of workers. Also requires `limit=None` to be passed. To prevent unexpected problems and maximize read throughput, API documentation recommends at most use 10 partitions. When using more than 10 partitions, actual throughout decreases. In future releases of the APIs, CDF may reject requests with more than 10 partitions.
Returns:
Iterator[Asset] | Iterator[AssetList]: yields Asset one by one if chunk_size is not specified, else AssetList objects.
"""
if aggregated_properties:
aggregated_properties = [to_camel_case(s) for s in aggregated_properties]

agg_props = self._process_aggregated_props(aggregated_properties)
asset_subtree_ids_processed = process_asset_subtree_ids(asset_subtree_ids, asset_subtree_external_ids)
data_set_ids_processed = process_data_set_ids(data_set_ids, data_set_external_ids)

Expand Down Expand Up @@ -172,7 +172,7 @@ def __call__(
filter=filter,
limit=limit,
partitions=partitions,
other_params={"aggregatedProperties": aggregated_properties} if aggregated_properties else {},
other_params=agg_props,
)

def __iter__(self) -> Iterator[Asset]:
Expand Down Expand Up @@ -818,7 +818,7 @@ def filter(
self,
filter: Filter | dict,
sort: SortSpec | list[SortSpec] | None = None,
aggregated_properties: Sequence[Literal["child_count", "path", "depth"]] | None = None,
aggregated_properties: Sequence[AggregateAssetProperty] | None = None,
limit: int | None = DEFAULT_LIMIT_READ,
) -> AssetList:
"""`Advanced filter assets <https://developer.cognite.com/api#tag/Assets/operation/listAssets>`_
Expand All @@ -830,7 +830,7 @@ def filter(
Args:
filter (Filter | dict): Filter to apply.
sort (SortSpec | list[SortSpec] | None): The criteria to sort by. Can be up to two properties to sort by default to ascending order.
aggregated_properties (Sequence[Literal["child_count", "path", "depth"]] | None): Set of aggregated properties to include. Options are childCount, path, depth.
aggregated_properties (Sequence[AggregateAssetProperty] | None): Set of aggregated properties to include. Options are childCount, path, depth.
limit (int | None): Maximum number of results to return. Defaults to 25. Set to -1, float("inf") or None to return all items.
Returns:
Expand Down Expand Up @@ -864,20 +864,15 @@ def filter(
"""
self._validate_filter(filter)

if aggregated_properties:
aggregated_properties_camel = [to_camel_case(prop) for prop in aggregated_properties]
else:
aggregated_properties_camel = None

agg_props = self._process_aggregated_props(aggregated_properties)
return self._list(
list_cls=AssetList,
resource_cls=Asset,
method="POST",
limit=limit,
advanced_filter=filter.dump(camel_case_property=True) if isinstance(filter, Filter) else filter,
sort=prepare_filter_sort(sort, AssetSort),
other_params={"aggregatedProperties": aggregated_properties_camel} if aggregated_properties_camel else {},
other_params=agg_props,
)

def _validate_filter(self, filter: Filter | dict | None) -> None:
Expand Down Expand Up @@ -981,6 +976,12 @@ def _get_children(self, assets: list) -> list:
children.extend(res)
return children

@staticmethod
def _process_aggregated_props(agg_props: Sequence[AggregateAssetProperty] | None) -> dict[str, list[str]]:
if not agg_props:
return {}
return {"aggregatedProperties": [to_camel_case(prop) for prop in agg_props]}

def list(
self,
name: str | None = None,
Expand All @@ -998,7 +999,7 @@ def list(
last_updated_time: dict[str, Any] | TimestampRange | None = None,
root: bool | None = None,
external_id_prefix: str | None = None,
aggregated_properties: Sequence[str] | None = None,
aggregated_properties: Sequence[AggregateAssetProperty] | None = None,
partitions: int | None = None,
limit: int | None = DEFAULT_LIMIT_READ,
) -> AssetList:
Expand All @@ -1020,7 +1021,7 @@ def list(
last_updated_time (dict[str, Any] | TimestampRange | None): Range between two timestamps. Possible keys are `min` and `max`, with values given as time stamps in ms.
root (bool | None): filtered assets are root assets or not.
external_id_prefix (str | None): Filter by this (case-sensitive) prefix for the external ID.
aggregated_properties (Sequence[str] | None): Set of aggregated properties to include.
aggregated_properties (Sequence[AggregateAssetProperty] | None): Set of aggregated properties to include. Options are childCount, path, depth.
partitions (int | None): Retrieve assets in parallel using this number of workers. Also requires `limit=None` to be passed. To prevent unexpected problems and maximize read throughput, API documentation recommends at most use 10 partitions. When using more than 10 partitions, actual throughout decreases. In future releases of the APIs, CDF may reject requests with more than 10 partitions.
limit (int | None): Maximum number of assets to return. Defaults to 25. Set to -1, float("inf") or None to return all items.
Expand Down Expand Up @@ -1057,9 +1058,7 @@ def list(
>>> my_label_filter = LabelFilter(contains_all=["PUMP", "VERIFIED"])
>>> asset_list = c.assets.list(labels=my_label_filter)
"""
if aggregated_properties:
aggregated_properties = [to_camel_case(s) for s in aggregated_properties]

agg_props = self._process_aggregated_props(aggregated_properties)
asset_subtree_ids_processed = process_asset_subtree_ids(asset_subtree_ids, asset_subtree_external_ids)
data_set_ids_processed = process_data_set_ids(data_set_ids, data_set_external_ids)

Expand All @@ -1084,7 +1083,7 @@ def list(
method="POST",
limit=limit,
filter=filter,
other_params={"aggregatedProperties": aggregated_properties} if aggregated_properties else {},
other_params=agg_props,
partitions=partitions,
)

Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.8.0"
__version__ = "7.8.1"
__api_subversion__ = "V20220125"
2 changes: 1 addition & 1 deletion cognite/client/data_classes/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def to_pandas( # type: ignore [override]
pd = local_import("pandas")
col = df.squeeze()
aggregates = convert_dict_to_case(col.pop("aggregates"), camel_case)
return pd.concat((col, pd.Series(aggregates).add_prefix(aggregates_prefix))).to_frame()
return pd.concat((col, pd.Series(aggregates).add_prefix(aggregates_prefix))).to_frame(name="value")


class AssetUpdate(CogniteUpdate):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.8.0"
version = "7.8.1"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
16 changes: 15 additions & 1 deletion tests/tests_unit/test_api/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import pytest

from cognite.client._api.assets import Asset, AssetList, AssetUpdate
from cognite.client.data_classes import AssetFilter, Label, LabelFilter, TimestampRange
from cognite.client.data_classes import AggregateResultItem, AssetFilter, Label, LabelFilter, TimestampRange
from cognite.client.exceptions import CogniteAPIError
from cognite.client.utils._text import convert_all_keys_to_snake_case
from tests.utils import jsgz_load

EXAMPLE_ASSET = {
Expand Down Expand Up @@ -344,6 +345,19 @@ def test_asset_to_pandas(self, cognite_client, mock_assets_response):
assert 1 == df.loc["id"][0]
assert "metadata-value" == df.loc["metadata-key"][0]

def test_expand_aggregates(self):
agg_props = {"childCount": 0, "depth": 4, "path": [{"id": 35927223}, {"id": 20283836}, {"id": 296}]}
asset = Asset(name="foo", aggregates=AggregateResultItem._load(agg_props))
expanded = asset.to_pandas(expand_aggregates=True)
not_expanded = asset.to_pandas(expand_aggregates=False)

assert expanded.columns == ["value"] # This was wrongly col=0 prior to 7.8.1
assert not_expanded.columns == ["value"]
assert convert_all_keys_to_snake_case(agg_props) == not_expanded.loc["aggregates"].item()
assert agg_props["childCount"] == expanded.loc["aggregates.child_count"].item()
assert agg_props["depth"] == expanded.loc["aggregates.depth"].item()
assert agg_props["path"] == expanded.loc["aggregates.path"].item()

# need subtree here to get list, since to_pandas on a single Asset gives int for id, but on AssetList it gives int64
def test_asset_id_from_to_pandas(self, cognite_client, mock_get_subtree):
df = cognite_client.assets.retrieve_subtree(id=1).to_pandas()
Expand Down

0 comments on commit 557248f

Please sign in to comment.