Skip to content

Commit

Permalink
Datapoint subscription Instance ID (#2033)
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino authored Nov 19, 2024
1 parent 0691b84 commit 578825b
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.67.2] - 2023-11-19
### Added
- Instance ID is now supported for DatapointsSubscriptionsAPI (`client.time_series.subscriptions`)

## [7.67.1] - 2024-11-19
### Added
- Workflow triggers support metadata field
Expand Down
14 changes: 14 additions & 0 deletions cognite/client/_api/datapoints_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ def create(self, subscription: DataPointSubscriptionWrite) -> DatapointSubscript
... time_series_ids=["myFistTimeSeries", "mySecondTimeSeries"])
>>> created = client.time_series.subscriptions.create(sub)
Create a subscription with explicit time series IDs given as Node IDs
either from CogniteTimeSeries or an extension of CogniteTimeseries:
>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import DataPointSubscriptionWrite
>>> from cognite.client.data_classes.data_modeling import NodeId
>>> client = CogniteClient()
>>> sub = DataPointSubscriptionWrite(
... external_id="my_subscription",
... name="My subscription with Data Model Ids",
... partition_count=1,
... instance_ids=[NodeId("my_space", "myFistTimeSeries"), NodeId("my_space", "mySecondTimeSeries")])
>>> created = client.time_series.subscriptions.create(sub)
Create a filter defined subscription for all numeric time series that are stepwise:
>>> from cognite.client import CogniteClient
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.67.1"
__version__ = "7.67.2"
__api_subversion__ = "20230101"
49 changes: 44 additions & 5 deletions cognite/client/data_classes/datapoints_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
WriteableCogniteResource,
WriteableCogniteResourceList,
)
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.data_classes.filters import _BASIC_FILTERS as _FILTERS_SUPPORTED
from cognite.client.data_classes.filters import Filter, _validate_filter
from cognite.client.utils import _json
Expand Down Expand Up @@ -125,6 +126,7 @@ class DataPointSubscriptionWrite(DatapointSubscriptionCore):
external_id (str): Externally provided ID for the subscription. Must be unique.
partition_count (int): The maximum effective parallelism of this subscription (the number of clients that can read from it concurrently) will be limited to this number, but a higher partition count will cause a higher time overhead. The partition count must be between 1 and 100. CAVEAT: This cannot change after the subscription has been created.
time_series_ids (list[ExternalId] | None): List of (external) ids of time series that this subscription will listen to. Not compatible with filter.
instance_ids(list[NodeId] | None): List of instance ids of time series that this subscription will listen to. Not compatible with filter.
filter (Filter | None): A filter DSL (Domain Specific Language) to define advanced filter queries. Not compatible with time_series_ids.
name (str | None): No description.
description (str | None): A summary explanation for the subscription.
Expand All @@ -136,16 +138,18 @@ def __init__(
external_id: str,
partition_count: int,
time_series_ids: list[ExternalId] | None = None,
instance_ids: list[NodeId] | None = None,
filter: Filter | None = None,
name: str | None = None,
description: str | None = None,
data_set_id: int | None = None,
) -> None:
if not exactly_one_is_not_none(time_series_ids, filter):
raise ValueError("Exactly one of time_series_ids and filter must be given")
if not exactly_one_is_not_none(time_series_ids or instance_ids, filter):
raise ValueError("Exactly one of time_series_ids/instance_ids or filter must be provided")
_validate_filter(filter, _FILTERS_SUPPORTED, "DataPointSubscriptions")
super().__init__(external_id, partition_count, filter, name, description, data_set_id)
self.time_series_ids = time_series_ids
self.instance_ids = instance_ids

@classmethod
def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> Self:
Expand All @@ -154,12 +158,19 @@ def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> S
external_id=resource["externalId"],
partition_count=resource["partitionCount"],
time_series_ids=resource.get("timeSeriesIds"),
instance_ids=[NodeId.load(item) for item in resource["instanceIds"]] if "instanceIds" in resource else None,
filter=filter,
name=resource.get("name"),
description=resource.get("description"),
data_set_id=resource.get("dataSetId"),
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
data = super().dump(camel_case)
if self.instance_ids is not None:
data["instanceIds"] = [item.dump(include_instance_type=False) for item in self.instance_ids]
return data

def as_write(self) -> DataPointSubscriptionWrite:
"""Returns this DatapointSubscription instance"""
return self
Expand Down Expand Up @@ -197,6 +208,16 @@ def add(self, value: list) -> DataPointSubscriptionUpdate:
def remove(self, value: list) -> DataPointSubscriptionUpdate:
return self._remove(value)

class _ListDataPointSubscriptionNodeIdUpdate(CogniteListUpdate):
def set(self, value: list[NodeId]) -> DataPointSubscriptionUpdate:
return self._set([item.dump(include_instance_type=False) for item in value])

def add(self, value: list[NodeId]) -> DataPointSubscriptionUpdate:
return self._add([item.dump(include_instance_type=False) for item in value])

def remove(self, value: list[NodeId]) -> DataPointSubscriptionUpdate:
return self._remove([item.dump(include_instance_type=False) for item in value])

@property
def name(self) -> _PrimitiveDataPointSubscriptionUpdate:
return DataPointSubscriptionUpdate._PrimitiveDataPointSubscriptionUpdate(self, "name")
Expand All @@ -209,6 +230,10 @@ def data_set_id(self) -> _PrimitiveDataPointSubscriptionUpdate:
def time_series_ids(self) -> _ListDataPointSubscriptionUpdate:
return DataPointSubscriptionUpdate._ListDataPointSubscriptionUpdate(self, "timeSeriesIds")

@property
def instance_ids(self) -> _ListDataPointSubscriptionNodeIdUpdate:
return DataPointSubscriptionUpdate._ListDataPointSubscriptionNodeIdUpdate(self, "instanceIds")

@property
def filter(self) -> _FilterDataPointSubscriptionUpdate:
return DataPointSubscriptionUpdate._FilterDataPointSubscriptionUpdate(self, "filter")
Expand All @@ -218,6 +243,7 @@ def _get_update_properties(cls, item: CogniteResource | None = None) -> list[Pro
return [
PropertySpec("name"),
PropertySpec("time_series_ids", is_list=True),
PropertySpec("instance_ids", is_list=True),
PropertySpec("filter", is_nullable=False),
PropertySpec("data_set_id"),
]
Expand All @@ -230,23 +256,36 @@ class TimeSeriesID(CogniteResource):
Args:
id (int): A server-generated ID for the object.
external_id (ExternalId | None): The external ID provided by the client. Must be unique for the resource type.
instance_id (NodeId | None): The ID of an instance in Cognite Data Models.
"""

def __init__(self, id: int, external_id: ExternalId | None = None) -> None:
def __init__(self, id: int, external_id: ExternalId | None = None, instance_id: NodeId | None = None) -> None:
self.id = id
self.external_id = external_id
self.instance_id = instance_id

def __repr__(self) -> str:
return f"TimeSeriesID(id={self.id}, external_id={self.external_id})"
identifier = f"id={self.id}"
if self.external_id is not None:
identifier += f", external_id={self.external_id}"
elif self.instance_id is not None:
identifier += f", instance_id={self.instance_id!r}"
return f"TimeSeriesID({identifier})"

@classmethod
def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> TimeSeriesID:
return cls(id=resource["id"], external_id=resource.get("externalId"))
return cls(
id=resource["id"],
external_id=resource.get("externalId"),
instance_id=NodeId.load(resource["instanceId"]) if "instanceId" in resource else None,
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
resource: dict[str, Any] = {"id": self.id}
if self.external_id is not None:
resource["externalId" if camel_case else "external_id"] = self.external_id
if self.instance_id is not None:
resource["instanceId" if camel_case else "instance_id"] = self.instance_id.dump(include_instance_type=False)
return resource


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.67.1"
version = "7.67.2"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
53 changes: 48 additions & 5 deletions tests/tests_integration/test_api/test_datapoint_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import math
import random
import time
import unittest
from contextlib import contextmanager
from datetime import datetime

Expand All @@ -12,6 +13,8 @@

from cognite.client import CogniteClient
from cognite.client.data_classes import TimeSeries, filters
from cognite.client.data_classes.data_modeling import NodeId, SpaceApply
from cognite.client.data_classes.data_modeling.cdm.v1 import CogniteTimeSeriesApply
from cognite.client.data_classes.datapoints_subscriptions import (
DatapointSubscription,
DatapointSubscriptionProperty,
Expand Down Expand Up @@ -53,11 +56,42 @@ def all_time_series_external_ids(cognite_client: CogniteClient, os_and_py_versio
).as_external_ids()


@pytest.fixture(scope="session")
def timeseries_space(cognite_client: CogniteClient) -> str:
return cognite_client.data_modeling.spaces.apply(
SpaceApply("PYSDK_timeSeries_subscription", description="Space for testing datapoint subscriptions")
).space


@pytest.fixture(scope="session")
def all_times_series_node_ids(
cognite_client: CogniteClient, os_and_py_version: str, timeseries_space: str
) -> list[NodeId]:
timeseries_node_ids = [f"PYSDK DataPoint Subscription Node {os_and_py_version} Test {no}" for no in range(20)]

return cognite_client.data_modeling.instances.apply(
[
CogniteTimeSeriesApply(
space=timeseries_space,
external_id=external_id,
is_step=False,
time_series_type="numeric",
)
for external_id in timeseries_node_ids
]
).nodes.as_ids()


@pytest.fixture
def time_series_external_ids(all_time_series_external_ids):
# Spread the load to avoid API errors like 'a ts can't be part of too many subscriptions':
ts_xids = all_time_series_external_ids[:]
return random.sample(ts_xids, k=3)
return random.sample(ts_xids, k=2)


@pytest.fixture
def time_series_node_ids(all_times_series_node_ids: list[NodeId]) -> list[NodeId]:
return [random.choice(all_times_series_node_ids)]


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -96,13 +130,14 @@ def test_list_subscriptions(self, cognite_client: CogniteClient) -> None:
assert len(subscriptions) > 0, "Add at least one subscription to the test environment to run this test"

def test_create_retrieve_delete_subscription(
self, cognite_client: CogniteClient, time_series_external_ids: list[str]
self, cognite_client: CogniteClient, time_series_external_ids: list[str], time_series_node_ids: list[NodeId]
):
data_set = cognite_client.data_sets.list(limit=1)[0]
new_subscription = DataPointSubscriptionWrite(
external_id=f"PYSDKDataPointSubscriptionCreateRetrieveDeleteTest-{random_string(10)}",
name="PYSDKDataPointSubscriptionCreateRetrieveDeleteTest",
time_series_ids=time_series_external_ids,
instance_ids=time_series_node_ids,
partition_count=1,
data_set_id=data_set.id,
)
Expand All @@ -111,7 +146,9 @@ def test_create_retrieve_delete_subscription(

assert created.created_time
assert created.last_updated_time
assert created.time_series_count == len(new_subscription.time_series_ids)
assert created.time_series_count == len(new_subscription.time_series_ids) + len(
new_subscription.instance_ids
)
assert retrieved_subscription.external_id == new_subscription.external_id == created.external_id
assert retrieved_subscription.name == new_subscription.name == created.name
assert retrieved_subscription.description == new_subscription.description == created.description
Expand All @@ -120,8 +157,14 @@ def test_create_retrieve_delete_subscription(
time_series_in_subscription = cognite_client.time_series.subscriptions.list_member_time_series(
new_subscription.external_id, limit=10
)
retrieved_time_series_external_ids = [ts.external_id for ts in time_series_in_subscription]
assert sorted(new_subscription.time_series_ids) == sorted(retrieved_time_series_external_ids)
retrieved_time_series_external_ids = [
ts.external_id for ts in time_series_in_subscription if ts.external_id
]
unittest.TestCase().assertCountEqual(new_subscription.time_series_ids, retrieved_time_series_external_ids)
retrieved_time_series_instance_ids = [
ts.instance_id for ts in time_series_in_subscription if ts.instance_id
]
unittest.TestCase().assertCountEqual(new_subscription.instance_ids, retrieved_time_series_instance_ids)

cognite_client.time_series.subscriptions.delete(new_subscription.external_id)
retrieved_deleted = cognite_client.time_series.subscriptions.retrieve(new_subscription.external_id)
Expand Down

0 comments on commit 578825b

Please sign in to comment.