Skip to content

Commit

Permalink
Some improvements to datapoints subscriptions (#1378)
Browse files Browse the repository at this point in the history
  • Loading branch information
erlendvollset authored Sep 25, 2023
1 parent 290f85d commit 42626a5
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 10 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [6.26.0] - 2023-09-22
### Added
- Support `partition` and `cursor` parameters on `time_series.subscriptions.iterate_data`
- Include the `cursor` attribute on `DatapointSubscriptionBatch`, which is yielded in every iteration
of `time_series.subscriptions.iterate_data`.

## [6.25.3] - 2023-09-19
### Added
- Support for setting and retrieving `data_set_id` in data class `client.data_classes.ThreeDModel`.
Expand All @@ -34,13 +40,13 @@ Changes are grouped as follows
(which happens when the credentials are invalid).
- While processing source- and destination credentials in `client.transformations.[create, update]`, an `AttributeError`
can no longer be raised (by not specifying project).

### Added
- `TransformationList` now correctly inherits the two (missing) helper methods `as_ids()` and `as_external_ids()`.

## [6.25.0] - 2023-09-14
### Added
- Support for `ignore_unknown_ids` in `client.functions.retrieve_multiple` method.

## [6.24.1] - 2023-09-13
### Fixed
- Bugfix for `AssetsAPI.create_hierarchy` when running in upsert mode: It could skip certain updates above
Expand Down
13 changes: 9 additions & 4 deletions cognite/client/_api/datapoints_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def iterate_data(
external_id: str,
start: str | None = None,
limit: int = DEFAULT_LIMIT_READ,
partition: int = 0,
cursor: str | None = None,
) -> Iterator[DatapointSubscriptionBatch]:
"""`Iterate over data from a given subscription. <https://pr-2221.specs.preview.cogniteapp.com/20230101-beta.json.html#tag/Data-point-subscriptions/operation/listSubscriptionData>`_
Expand All @@ -191,6 +193,8 @@ def iterate_data(
external_id (str): The external ID of the subscription.
start (str | None): When to start the iteration. If set to None, the iteration will start from the beginning. The format is "N[timeunit]-ago", where timeunit is w,d,h,m (week, day, hour, minute). For example, "12h-ago" will start the iteration from 12 hours ago. You can also set it to "now" to jump straight to the end. Defaults to None.
limit (int): Approximate number of results to return across all partitions.
partition (int): The partition to iterate over. Defaults to 0.
cursor (str | None): Optional cursor to start iterating from.
Yields:
DatapointSubscriptionBatch: Changes to the subscription and data in the subscribed time series.
Expand All @@ -215,9 +219,7 @@ def iterate_data(
"""
self._experimental_warning()

current_partitions: list[DatapointSubscriptionPartition] = [
DatapointSubscriptionPartition.create(p) for p in [0]
]
current_partitions = [DatapointSubscriptionPartition.create((partition, cursor))]
while True:
body = {
"externalId": external_id,
Expand All @@ -231,7 +233,10 @@ def iterate_data(
res = self._post(url_path=self._RESOURCE_PATH + "/data/list", json=body)
batch = _DatapointSubscriptionBatchWithPartitions._load(res.json())

yield DatapointSubscriptionBatch(batch.updates, batch.subscription_changes, batch.has_next)
cursor = batch.partitions[0].cursor
assert cursor is not None

yield DatapointSubscriptionBatch(batch.updates, batch.subscription_changes, batch.has_next, cursor)

current_partitions = batch.partitions

Expand Down
3 changes: 1 addition & 2 deletions cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import annotations

__version__ = "6.25.3"

__version__ = "6.26.0"
__api_subversion__ = "V20220125"
10 changes: 8 additions & 2 deletions cognite/client/data_classes/datapoints_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ class DatapointSubscriptionPartition:
cursor: str | None = None

@classmethod
def create(cls, data: tuple[int, str] | int | DatapointSubscriptionPartition) -> DatapointSubscriptionPartition:
def create(
cls, data: tuple[int, str | None] | int | DatapointSubscriptionPartition
) -> DatapointSubscriptionPartition:
if isinstance(data, DatapointSubscriptionPartition):
return data
if isinstance(data, tuple):
Expand All @@ -312,10 +314,11 @@ class DatapointSubscriptionBatch:
updates: list[DatapointsUpdate]
subscription_changes: SubscriptionTimeSeriesUpdate
has_next: bool
cursor: str


@dataclass(frozen=True)
class _DatapointSubscriptionBatchWithPartitions(DatapointSubscriptionBatch):
class _DatapointSubscriptionBatchWithPartitions:
"""A batch of data from a subscription.
Args:
Expand All @@ -325,6 +328,9 @@ class _DatapointSubscriptionBatchWithPartitions(DatapointSubscriptionBatch):
subscription_changes (SubscriptionTimeSeriesUpdate): If present, this object represents changes to the subscription definition. The subscription will now start/stop listening to changes from the time series listed here.
"""

updates: list[DatapointsUpdate]
subscription_changes: SubscriptionTimeSeriesUpdate
has_next: bool
partitions: list[DatapointSubscriptionPartition]

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "6.25.3"
version = "6.26.0"

description = "Cognite Python SDK"
readme = "README.md"
Expand Down

0 comments on commit 42626a5

Please sign in to comment.