Skip to content

Commit

Permalink
Instances: Include typing (#1691)
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino authored Jul 12, 2024
1 parent 4ad52b8 commit 5dd7801
Show file tree
Hide file tree
Showing 10 changed files with 417 additions and 34 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.54.0] - 2024-07-12
### Added
- In the `client.data_modeling.instances` the methods `.search`, `.retrieve`,`.list`, `.query`, and `.sync` now
support the `include_typing` parameter. This parameter is used to include typing information in the response,
that can be accessed via the `.typing` attribute on the result object.

## [7.53.4] - 2024-07-11
### Added
- `FilesAPI.upload_bytes` and `FilesAPI.upload` are updated to be compatible with Private Link projects.
Expand Down
100 changes: 80 additions & 20 deletions cognite/client/_api/data_modeling/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
T_Edge,
T_Node,
TargetUnit,
TypeInformation,
)
from cognite.client.data_classes.data_modeling.query import (
Query,
Expand Down Expand Up @@ -111,11 +112,21 @@ def __init__(self, instance_cls: type) -> None:
self._list_cls = NodeList if issubclass(instance_cls, TypedNode) else EdgeList

def __call__(self, items: Any, cognite_client: CogniteClient | None = None) -> Any:
return self._list_cls(items, cognite_client)
return self._list_cls(items, None, cognite_client)

def _load(self, data: str | dict, cognite_client: CogniteClient | None = None) -> T_Node | T_Edge:
data = load_yaml_or_json(data) if isinstance(data, str) else data
return self._list_cls([self._instance_cls._load(item) for item in data], cognite_client) # type: ignore[return-value, attr-defined]
return self._list_cls([self._instance_cls._load(item) for item in data], None, cognite_client) # type: ignore[return-value, attr-defined]

@classmethod
def _load_raw_api_response(self, responses: list[dict[str, Any]], cognite_client: CogniteClient) -> T_Node | T_Edge:
typing = next((TypeInformation._load(resp["typing"]) for resp in responses if "typing" in resp), None)
resources = [
self._instance_cls._load(item, cognite_client=cognite_client) # type: ignore[attr-defined]
for response in responses
for item in response["items"]
]
return self._list_cls(resources, typing, cognite_client=cognite_client) # type: ignore[return-value]


class _NodeOrEdgeApplyResultList(CogniteResourceList):
Expand Down Expand Up @@ -248,18 +259,29 @@ def __call__(
resource_cls, list_cls = _NodeOrEdgeResourceAdapter, EdgeList
else:
raise ValueError(f"Invalid instance type: {instance_type}")

return cast(
Union[Iterator[Edge], Iterator[EdgeList], Iterator[Node], Iterator[NodeList]],
self._list_generator(
list_cls=list_cls,
resource_cls=resource_cls,
if not include_typing:
return cast(
Union[Iterator[Edge], Iterator[EdgeList], Iterator[Node], Iterator[NodeList]],
self._list_generator(
list_cls=list_cls,
resource_cls=resource_cls,
method="POST",
chunk_size=chunk_size,
limit=limit,
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
other_params=other_params,
),
)
return (
list_cls._load_raw_api_response([raw], self._cognite_client) # type: ignore[attr-defined]
for raw in self._list_generator_raw_responses(
method="POST",
settings_forcing_raw_response_loading=[f"{include_typing=}"],
chunk_size=chunk_size,
limit=limit,
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
other_params=other_params,
),
)
)

def __iter__(self) -> Iterator[Node]:
Expand Down Expand Up @@ -550,6 +572,15 @@ def _retrieve_typed(
class _NodeOrEdgeList(CogniteResourceList):
_RESOURCE = (node_cls, edge_cls) # type: ignore[assignment]

def __init__(
self,
resources: list[Node | Edge],
typing: TypeInformation | None,
cognite_client: CogniteClient | None,
):
super().__init__(resources, cognite_client)
self.typing = typing

@classmethod
def _load(
cls, resource_list: Iterable[dict[str, Any]], cognite_client: CogniteClient | None = None
Expand All @@ -558,19 +589,32 @@ def _load(
node_cls._load(data) if data["instanceType"] == "node" else edge_cls._load(data)
for data in resource_list
]
return cls(resources, None)
return cls(resources, None, None)

@classmethod
def _load_raw_api_response(
cls, responses: list[dict[str, Any]], cognite_client: CogniteClient
) -> _NodeOrEdgeList:
typing = next((TypeInformation._load(resp["typing"]) for resp in responses if "typing" in resp), None)
resources = [
node_cls._load(data) if data["instanceType"] == "node" else edge_cls._load(data)
for response in responses
for data in response["items"]
]
return cls(resources, typing, cognite_client) # type: ignore[arg-type]

res = self._retrieve_multiple( # type: ignore[call-overload]
list_cls=_NodeOrEdgeList,
resource_cls=_NodeOrEdgeResourceAdapter(node_cls, edge_cls),
identifiers=identifiers,
other_params=other_params,
executor=ConcurrencySettings.get_data_modeling_executor(),
settings_forcing_raw_response_loading=[f"{include_typing=}"] if include_typing else None,
)

return InstancesResult[T_Node, T_Edge](
nodes=NodeList([node for node in res if isinstance(node, Node)]),
edges=EdgeList([edge for edge in res if isinstance(edge, Edge)]),
nodes=NodeList([node for node in res if isinstance(node, Node)], typing=res.typing),
edges=EdgeList([edge for edge in res if isinstance(edge, Edge)], typing=res.typing),
)

@staticmethod
Expand Down Expand Up @@ -935,6 +979,7 @@ def search(
target_units: list[TargetUnit] | None = None,
space: str | SequenceNotStr[str] | None = None,
filter: Filter | dict[str, Any] | None = None,
include_typing: bool = False,
limit: int = DEFAULT_LIMIT_READ,
sort: Sequence[InstanceSort | dict] | InstanceSort | dict | None = None,
) -> NodeList[Node]: ...
Expand All @@ -949,6 +994,7 @@ def search(
target_units: list[TargetUnit] | None = None,
space: str | SequenceNotStr[str] | None = None,
filter: Filter | dict[str, Any] | None = None,
include_typing: bool = False,
limit: int = DEFAULT_LIMIT_READ,
sort: Sequence[InstanceSort | dict] | InstanceSort | dict | None = None,
) -> EdgeList[Edge]: ...
Expand All @@ -963,6 +1009,7 @@ def search(
target_units: list[TargetUnit] | None = None,
space: str | SequenceNotStr[str] | None = None,
filter: Filter | dict[str, Any] | None = None,
include_typing: bool = False,
limit: int = DEFAULT_LIMIT_READ,
sort: Sequence[InstanceSort | dict] | InstanceSort | dict | None = None,
) -> NodeList[T_Node]: ...
Expand All @@ -977,6 +1024,7 @@ def search(
target_units: list[TargetUnit] | None = None,
space: str | SequenceNotStr[str] | None = None,
filter: Filter | dict[str, Any] | None = None,
include_typing: bool = False,
limit: int = DEFAULT_LIMIT_READ,
sort: Sequence[InstanceSort | dict] | InstanceSort | dict | None = None,
) -> EdgeList[T_Edge]: ...
Expand All @@ -990,6 +1038,7 @@ def search(
target_units: list[TargetUnit] | None = None,
space: str | SequenceNotStr[str] | None = None,
filter: Filter | dict[str, Any] | None = None,
include_typing: bool = False,
limit: int = DEFAULT_LIMIT_READ,
sort: Sequence[InstanceSort | dict] | InstanceSort | dict | None = None,
) -> NodeList[T_Node] | EdgeList[T_Edge]:
Expand All @@ -1003,6 +1052,7 @@ def search(
target_units (list[TargetUnit] | None): Properties to convert to another unit. The API can only convert to another unit if a unit has been defined as part of the type on the underlying container being queried.
space (str | SequenceNotStr[str] | None): Restrict instance search to the given space (or list of spaces).
filter (Filter | dict[str, Any] | None): Advanced filtering of instances.
include_typing (bool): Whether to include typing information.
limit (int): Maximum number of instances to return. Defaults to 25.
sort (Sequence[InstanceSort | dict] | InstanceSort | dict | None): How you want the listed instances information ordered.
Expand Down Expand Up @@ -1056,6 +1106,8 @@ def search(
body = {"view": view.dump(camel_case=True), "query": query, "instanceType": instance_type_str, "limit": limit}
if properties:
body["properties"] = properties
if include_typing:
body["includeTyping"] = include_typing
if filter:
body["filter"] = filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter
if target_units:
Expand All @@ -1069,8 +1121,9 @@ def search(
body["sort"] = [self._dump_instance_sort(s) for s in sorts]

res = self._post(url_path=self._RESOURCE_PATH + "/search", json=body)
items = res.json()["items"]
return list_cls([resource_cls._load(item) for item in items], cognite_client=None)
result = res.json()
typing = TypeInformation._load(result["typing"]) if "typing" in result else None
return list_cls([resource_cls._load(item) for item in result["items"]], typing, cognite_client=None)

@overload
def aggregate(
Expand Down Expand Up @@ -1296,14 +1349,15 @@ def histogram(
else:
return [HistogramValue.load(item["aggregates"][0]) for item in res.json()["items"]]

def query(self, query: Query) -> QueryResult:
def query(self, query: Query, include_typing: bool = False) -> QueryResult:
"""`Advanced query interface for nodes/edges. <https://developer.cognite.com/api/v1/#tag/Instances/operation/queryContent>`_
The Data Modelling API exposes an advanced query interface. The query interface supports parameterization,
recursive edge traversal, chaining of result sets, and granular property selection.
Args:
query (Query): Query.
include_typing (bool): Should we return property type information as part of the result?
Returns:
QueryResult: The resulting nodes and/or edges from the query.
Expand Down Expand Up @@ -1332,15 +1386,16 @@ def query(self, query: Query) -> QueryResult:
... )
>>> res = client.data_modeling.instances.query(query)
"""
return self._query_or_sync(query, "query")
return self._query_or_sync(query, "query", include_typing)

def sync(self, query: Query) -> QueryResult:
def sync(self, query: Query, include_typing: bool = False) -> QueryResult:
"""`Subscription to changes for nodes/edges. <https://developer.cognite.com/api/v1/#tag/Instances/operation/syncContent>`_
Subscribe to changes for nodes and edges in a project, matching a supplied filter.
Args:
query (Query): Query.
include_typing (bool): Should we return property type information as part of the result?
Returns:
QueryResult: The resulting nodes and/or edges from the query.
Expand Down Expand Up @@ -1375,16 +1430,20 @@ def sync(self, query: Query) -> QueryResult:
In the last example, the res_new will only contain the actors that have been added with the new movie.
"""
return self._query_or_sync(query, "sync")
return self._query_or_sync(query, "sync", include_typing=include_typing)

def _query_or_sync(self, query: Query, endpoint: Literal["query", "sync"]) -> QueryResult:
def _query_or_sync(self, query: Query, endpoint: Literal["query", "sync"], include_typing: bool) -> QueryResult:
body = query.dump(camel_case=True)
if include_typing:
body["includeTyping"] = include_typing

result = self._post(url_path=self._RESOURCE_PATH + f"/{endpoint}", json=body)

json_payload = result.json()
default_by_reference = query.instance_type_by_result_expression()
results = QueryResult.load(json_payload["items"], default_by_reference, json_payload["nextCursor"])
results = QueryResult.load(
json_payload["items"], default_by_reference, json_payload["nextCursor"], json_payload.get("typing")
)

return results

Expand Down Expand Up @@ -1526,6 +1585,7 @@ def list(
limit=limit,
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
other_params=other_params,
settings_forcing_raw_response_loading=[f"{include_typing=}"] if include_typing else [],
),
)

Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.53.4"
__version__ = "7.54.0"
__api_subversion__ = "20230101"
11 changes: 11 additions & 0 deletions cognite/client/data_classes/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,17 @@ def _load_raw_api_response(cls, responses: list[dict[str, Any]], cognite_client:
# an implementation of this method
raise NotImplementedError

def dump_raw(self, camel_case: bool = True) -> dict[str, Any]:
"""This method dumps the list with extra information in addition to the items.
Args:
camel_case (bool): Use camelCase for attribute names. Defaults to True.
Returns:
dict[str, Any]: A dictionary representation of the list.
"""
return {"items": [resource.dump(camel_case) for resource in self.data]}


T_CogniteResourceList = TypeVar("T_CogniteResourceList", bound=CogniteResourceList)

Expand Down
Loading

0 comments on commit 5dd7801

Please sign in to comment.