Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make subscription data calls more efficient #1538

Merged
merged 10 commits into from
Dec 11, 2023
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.5.5] - 2023-12-07
### Added
- Added `poll_timeout` parameter on `time_series.subscriptions.iterate_data`. Will keep the connection open and waiting,
until new data is available, up to `poll_timeout` seconds.

## [7.5.4] - 2023-12-06
### Changed
Expand Down
5 changes: 3 additions & 2 deletions cognite/client/_api/datapoints_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def iterate_data(
start: str | None = None,
limit: int = DEFAULT_LIMIT_READ,
partition: int = 0,
poll_timeout: int = 5,
cursor: str | None = None,
) -> Iterator[DatapointSubscriptionBatch]:
"""`Iterate over data from a given subscription. <https://pr-2221.specs.preview.cogniteapp.com/20230101-beta.json.html#tag/Data-point-subscriptions/operation/listSubscriptionData>`_
Expand All @@ -227,6 +228,7 @@ def iterate_data(
start (str | None): When to start the iteration. If set to None, the iteration will start from the beginning. The format is "N[timeunit]-ago", where timeunit is w,d,h,m (week, day, hour, minute). For example, "12h-ago" will start the iteration from 12 hours ago. You can also set it to "now" to jump straight to the end. Defaults to None.
limit (int): Approximate number of results to return across all partitions.
partition (int): The partition to iterate over. Defaults to 0.
poll_timeout (int): How many seconds to wait for new data, until an empty response is sent. Defaults to 5.
cursor (str | None): Optional cursor to start iterating from.

Yields:
Expand All @@ -252,8 +254,6 @@ def iterate_data(
... print(f"Added {len(batch.subscription_changes.added)} timeseries")
... print(f"Removed {len(batch.subscription_changes.removed)} timeseries")
... print(f"Changed timeseries data in {len(batch.updates)} updates")
... if not batch.has_next:
... time.sleep(1)
"""
self._warning.warn()

Expand All @@ -263,6 +263,7 @@ def iterate_data(
"externalId": external_id,
"partitions": [p.dump(camel_case=True) for p in current_partitions],
"limit": limit,
"pollTimeoutSeconds": poll_timeout,
}
if start is not None:
body["initializeCursors"] = start
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.5.4"
__version__ = "7.5.5"
__api_subversion__ = "V20220125"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.5.4"
version = "7.5.5"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down