From 578825b3e75cb4710bfea9a0f3cbd5c7d1f70575 Mon Sep 17 00:00:00 2001 From: Anders Albert <60234212+doctrino@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:11:39 +0100 Subject: [PATCH] Datapoint subscription Instance ID (#2033) --- CHANGELOG.md | 4 ++ .../client/_api/datapoints_subscriptions.py | 14 +++++ cognite/client/_version.py | 2 +- .../data_classes/datapoints_subscriptions.py | 49 +++++++++++++++-- pyproject.toml | 2 +- .../test_api/test_datapoint_subscriptions.py | 53 +++++++++++++++++-- 6 files changed, 112 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 159db290ba..e5d2031078 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [7.67.2] - 2023-11-19 +### Added +- Instance ID is now supported for DatapointsSubscriptionsAPI (`client.time_series.subscriptions`) + ## [7.67.1] - 2024-11-19 ### Added - Workflow triggers support metadata field diff --git a/cognite/client/_api/datapoints_subscriptions.py b/cognite/client/_api/datapoints_subscriptions.py index 672ff87c3c..7f5a739cc0 100644 --- a/cognite/client/_api/datapoints_subscriptions.py +++ b/cognite/client/_api/datapoints_subscriptions.py @@ -85,6 +85,20 @@ def create(self, subscription: DataPointSubscriptionWrite) -> DatapointSubscript ... time_series_ids=["myFistTimeSeries", "mySecondTimeSeries"]) >>> created = client.time_series.subscriptions.create(sub) + Create a subscription with explicit time series IDs given as Node IDs + either from CogniteTimeSeries or an extension of CogniteTimeseries: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes import DataPointSubscriptionWrite + >>> from cognite.client.data_classes.data_modeling import NodeId + >>> client = CogniteClient() + >>> sub = DataPointSubscriptionWrite( + ... external_id="my_subscription", + ... name="My subscription with Data Model Ids", + ... partition_count=1, + ... instance_ids=[NodeId("my_space", "myFistTimeSeries"), NodeId("my_space", "mySecondTimeSeries")]) + >>> created = client.time_series.subscriptions.create(sub) + Create a filter defined subscription for all numeric time series that are stepwise: >>> from cognite.client import CogniteClient diff --git a/cognite/client/_version.py b/cognite/client/_version.py index 7e6ca53b4e..0d7d525a7b 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "7.67.1" +__version__ = "7.67.2" __api_subversion__ = "20230101" diff --git a/cognite/client/data_classes/datapoints_subscriptions.py b/cognite/client/data_classes/datapoints_subscriptions.py index 67fd757fa3..e50880aec3 100644 --- a/cognite/client/data_classes/datapoints_subscriptions.py +++ b/cognite/client/data_classes/datapoints_subscriptions.py @@ -21,6 +21,7 @@ WriteableCogniteResource, WriteableCogniteResourceList, ) +from cognite.client.data_classes.data_modeling import NodeId from cognite.client.data_classes.filters import _BASIC_FILTERS as _FILTERS_SUPPORTED from cognite.client.data_classes.filters import Filter, _validate_filter from cognite.client.utils import _json @@ -125,6 +126,7 @@ class DataPointSubscriptionWrite(DatapointSubscriptionCore): external_id (str): Externally provided ID for the subscription. Must be unique. partition_count (int): The maximum effective parallelism of this subscription (the number of clients that can read from it concurrently) will be limited to this number, but a higher partition count will cause a higher time overhead. The partition count must be between 1 and 100. CAVEAT: This cannot change after the subscription has been created. time_series_ids (list[ExternalId] | None): List of (external) ids of time series that this subscription will listen to. Not compatible with filter. + instance_ids(list[NodeId] | None): List of instance ids of time series that this subscription will listen to. Not compatible with filter. filter (Filter | None): A filter DSL (Domain Specific Language) to define advanced filter queries. Not compatible with time_series_ids. name (str | None): No description. description (str | None): A summary explanation for the subscription. @@ -136,16 +138,18 @@ def __init__( external_id: str, partition_count: int, time_series_ids: list[ExternalId] | None = None, + instance_ids: list[NodeId] | None = None, filter: Filter | None = None, name: str | None = None, description: str | None = None, data_set_id: int | None = None, ) -> None: - if not exactly_one_is_not_none(time_series_ids, filter): - raise ValueError("Exactly one of time_series_ids and filter must be given") + if not exactly_one_is_not_none(time_series_ids or instance_ids, filter): + raise ValueError("Exactly one of time_series_ids/instance_ids or filter must be provided") _validate_filter(filter, _FILTERS_SUPPORTED, "DataPointSubscriptions") super().__init__(external_id, partition_count, filter, name, description, data_set_id) self.time_series_ids = time_series_ids + self.instance_ids = instance_ids @classmethod def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> Self: @@ -154,12 +158,19 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> S external_id=resource["externalId"], partition_count=resource["partitionCount"], time_series_ids=resource.get("timeSeriesIds"), + instance_ids=[NodeId.load(item) for item in resource["instanceIds"]] if "instanceIds" in resource else None, filter=filter, name=resource.get("name"), description=resource.get("description"), data_set_id=resource.get("dataSetId"), ) + def dump(self, camel_case: bool = True) -> dict[str, Any]: + data = super().dump(camel_case) + if self.instance_ids is not None: + data["instanceIds"] = [item.dump(include_instance_type=False) for item in self.instance_ids] + return data + def as_write(self) -> DataPointSubscriptionWrite: """Returns this DatapointSubscription instance""" return self @@ -197,6 +208,16 @@ def add(self, value: list) -> DataPointSubscriptionUpdate: def remove(self, value: list) -> DataPointSubscriptionUpdate: return self._remove(value) + class _ListDataPointSubscriptionNodeIdUpdate(CogniteListUpdate): + def set(self, value: list[NodeId]) -> DataPointSubscriptionUpdate: + return self._set([item.dump(include_instance_type=False) for item in value]) + + def add(self, value: list[NodeId]) -> DataPointSubscriptionUpdate: + return self._add([item.dump(include_instance_type=False) for item in value]) + + def remove(self, value: list[NodeId]) -> DataPointSubscriptionUpdate: + return self._remove([item.dump(include_instance_type=False) for item in value]) + @property def name(self) -> _PrimitiveDataPointSubscriptionUpdate: return DataPointSubscriptionUpdate._PrimitiveDataPointSubscriptionUpdate(self, "name") @@ -209,6 +230,10 @@ def data_set_id(self) -> _PrimitiveDataPointSubscriptionUpdate: def time_series_ids(self) -> _ListDataPointSubscriptionUpdate: return DataPointSubscriptionUpdate._ListDataPointSubscriptionUpdate(self, "timeSeriesIds") + @property + def instance_ids(self) -> _ListDataPointSubscriptionNodeIdUpdate: + return DataPointSubscriptionUpdate._ListDataPointSubscriptionNodeIdUpdate(self, "instanceIds") + @property def filter(self) -> _FilterDataPointSubscriptionUpdate: return DataPointSubscriptionUpdate._FilterDataPointSubscriptionUpdate(self, "filter") @@ -218,6 +243,7 @@ def _get_update_properties(cls, item: CogniteResource | None = None) -> list[Pro return [ PropertySpec("name"), PropertySpec("time_series_ids", is_list=True), + PropertySpec("instance_ids", is_list=True), PropertySpec("filter", is_nullable=False), PropertySpec("data_set_id"), ] @@ -230,23 +256,36 @@ class TimeSeriesID(CogniteResource): Args: id (int): A server-generated ID for the object. external_id (ExternalId | None): The external ID provided by the client. Must be unique for the resource type. + instance_id (NodeId | None): The ID of an instance in Cognite Data Models. """ - def __init__(self, id: int, external_id: ExternalId | None = None) -> None: + def __init__(self, id: int, external_id: ExternalId | None = None, instance_id: NodeId | None = None) -> None: self.id = id self.external_id = external_id + self.instance_id = instance_id def __repr__(self) -> str: - return f"TimeSeriesID(id={self.id}, external_id={self.external_id})" + identifier = f"id={self.id}" + if self.external_id is not None: + identifier += f", external_id={self.external_id}" + elif self.instance_id is not None: + identifier += f", instance_id={self.instance_id!r}" + return f"TimeSeriesID({identifier})" @classmethod def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> TimeSeriesID: - return cls(id=resource["id"], external_id=resource.get("externalId")) + return cls( + id=resource["id"], + external_id=resource.get("externalId"), + instance_id=NodeId.load(resource["instanceId"]) if "instanceId" in resource else None, + ) def dump(self, camel_case: bool = True) -> dict[str, Any]: resource: dict[str, Any] = {"id": self.id} if self.external_id is not None: resource["externalId" if camel_case else "external_id"] = self.external_id + if self.instance_id is not None: + resource["instanceId" if camel_case else "instance_id"] = self.instance_id.dump(include_instance_type=False) return resource diff --git a/pyproject.toml b/pyproject.toml index fa5e0de08b..082e121ef1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "7.67.1" +version = "7.67.2" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com" diff --git a/tests/tests_integration/test_api/test_datapoint_subscriptions.py b/tests/tests_integration/test_api/test_datapoint_subscriptions.py index 12fd933248..ccd909ac4f 100644 --- a/tests/tests_integration/test_api/test_datapoint_subscriptions.py +++ b/tests/tests_integration/test_api/test_datapoint_subscriptions.py @@ -3,6 +3,7 @@ import math import random import time +import unittest from contextlib import contextmanager from datetime import datetime @@ -12,6 +13,8 @@ from cognite.client import CogniteClient from cognite.client.data_classes import TimeSeries, filters +from cognite.client.data_classes.data_modeling import NodeId, SpaceApply +from cognite.client.data_classes.data_modeling.cdm.v1 import CogniteTimeSeriesApply from cognite.client.data_classes.datapoints_subscriptions import ( DatapointSubscription, DatapointSubscriptionProperty, @@ -53,11 +56,42 @@ def all_time_series_external_ids(cognite_client: CogniteClient, os_and_py_versio ).as_external_ids() +@pytest.fixture(scope="session") +def timeseries_space(cognite_client: CogniteClient) -> str: + return cognite_client.data_modeling.spaces.apply( + SpaceApply("PYSDK_timeSeries_subscription", description="Space for testing datapoint subscriptions") + ).space + + +@pytest.fixture(scope="session") +def all_times_series_node_ids( + cognite_client: CogniteClient, os_and_py_version: str, timeseries_space: str +) -> list[NodeId]: + timeseries_node_ids = [f"PYSDK DataPoint Subscription Node {os_and_py_version} Test {no}" for no in range(20)] + + return cognite_client.data_modeling.instances.apply( + [ + CogniteTimeSeriesApply( + space=timeseries_space, + external_id=external_id, + is_step=False, + time_series_type="numeric", + ) + for external_id in timeseries_node_ids + ] + ).nodes.as_ids() + + @pytest.fixture def time_series_external_ids(all_time_series_external_ids): # Spread the load to avoid API errors like 'a ts can't be part of too many subscriptions': ts_xids = all_time_series_external_ids[:] - return random.sample(ts_xids, k=3) + return random.sample(ts_xids, k=2) + + +@pytest.fixture +def time_series_node_ids(all_times_series_node_ids: list[NodeId]) -> list[NodeId]: + return [random.choice(all_times_series_node_ids)] @pytest.fixture(scope="session") @@ -96,13 +130,14 @@ def test_list_subscriptions(self, cognite_client: CogniteClient) -> None: assert len(subscriptions) > 0, "Add at least one subscription to the test environment to run this test" def test_create_retrieve_delete_subscription( - self, cognite_client: CogniteClient, time_series_external_ids: list[str] + self, cognite_client: CogniteClient, time_series_external_ids: list[str], time_series_node_ids: list[NodeId] ): data_set = cognite_client.data_sets.list(limit=1)[0] new_subscription = DataPointSubscriptionWrite( external_id=f"PYSDKDataPointSubscriptionCreateRetrieveDeleteTest-{random_string(10)}", name="PYSDKDataPointSubscriptionCreateRetrieveDeleteTest", time_series_ids=time_series_external_ids, + instance_ids=time_series_node_ids, partition_count=1, data_set_id=data_set.id, ) @@ -111,7 +146,9 @@ def test_create_retrieve_delete_subscription( assert created.created_time assert created.last_updated_time - assert created.time_series_count == len(new_subscription.time_series_ids) + assert created.time_series_count == len(new_subscription.time_series_ids) + len( + new_subscription.instance_ids + ) assert retrieved_subscription.external_id == new_subscription.external_id == created.external_id assert retrieved_subscription.name == new_subscription.name == created.name assert retrieved_subscription.description == new_subscription.description == created.description @@ -120,8 +157,14 @@ def test_create_retrieve_delete_subscription( time_series_in_subscription = cognite_client.time_series.subscriptions.list_member_time_series( new_subscription.external_id, limit=10 ) - retrieved_time_series_external_ids = [ts.external_id for ts in time_series_in_subscription] - assert sorted(new_subscription.time_series_ids) == sorted(retrieved_time_series_external_ids) + retrieved_time_series_external_ids = [ + ts.external_id for ts in time_series_in_subscription if ts.external_id + ] + unittest.TestCase().assertCountEqual(new_subscription.time_series_ids, retrieved_time_series_external_ids) + retrieved_time_series_instance_ids = [ + ts.instance_id for ts in time_series_in_subscription if ts.instance_id + ] + unittest.TestCase().assertCountEqual(new_subscription.instance_ids, retrieved_time_series_instance_ids) cognite_client.time_series.subscriptions.delete(new_subscription.external_id) retrieved_deleted = cognite_client.time_series.subscriptions.retrieve(new_subscription.external_id)