Skip to content

Commit

Permalink
add support for instance_id in synth. dps query (#2042)
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt authored Nov 23, 2024
1 parent a7aa775 commit 297b56e
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 14 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.69.0] - 2024-11-23
### Added
- Synthetic Datapoints API has better support for `instance_id`. Previously you had to specify these directly
in the expression(s), but now you can use the `variables` parameter to more easily substitute the time series
identifiers directly into the expression(s).

## [7.68.0] - 2024-11-22
### Added
- New methods: `WorkflowTriggerAPI.[list, list_runs]`
Expand Down
53 changes: 41 additions & 12 deletions cognite/client/_api/synthetic_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

from cognite.client._api_client import APIClient
from cognite.client.data_classes import Datapoints, DatapointsList, TimeSeries, TimeSeriesWrite
from cognite.client.data_classes.data_modeling.ids import NodeId
from cognite.client.data_classes.time_series import TimeSeriesCore
from cognite.client.utils._auxiliary import is_unlimited
from cognite.client.utils._concurrency import execute_tasks
from cognite.client.utils._identifier import Identifier, InstanceId
from cognite.client.utils._importing import local_import
from cognite.client.utils._time import timestamp_to_ms
from cognite.client.utils.useful_types import SequenceNotStr
Expand Down Expand Up @@ -51,7 +53,7 @@ def query(
start: int | str | datetime,
end: int | str | datetime,
limit: int | None = None,
variables: dict[str | sympy.Symbol, str | TimeSeries | TimeSeriesWrite] | None = None,
variables: dict[str | sympy.Symbol, str | NodeId | TimeSeries | TimeSeriesWrite] | None = None,
aggregate: str | None = None,
granularity: str | None = None,
target_unit: str | None = None,
Expand All @@ -64,7 +66,7 @@ def query(
start (int | str | datetime): Inclusive start.
end (int | str | datetime): Exclusive end.
limit (int | None): Number of datapoints per expression to retrieve.
variables (dict[str | sympy.Symbol, str | TimeSeries | TimeSeriesWrite] | None): An optional map of symbol replacements.
variables (dict[str | sympy.Symbol, str | NodeId | TimeSeries | TimeSeriesWrite] | None): An optional map of symbol replacements.
aggregate (str | None): use this aggregate when replacing entries from `variables`, does not affect time series given in the `ts{}` syntax.
granularity (str | None): use this granularity with the aggregate.
target_unit (str | None): use this target_unit when replacing entries from `variables`, does not affect time series given in the `ts{}` syntax.
Expand All @@ -75,21 +77,33 @@ def query(
Examples:
Request a synthetic time series query with direct syntax:
Execute a synthetic time series query with an expression. Here we sum three time series plus a constant. The first is referenced by ID,
the second by external ID, and the third by instance ID:
>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> expression = '''
... 123
... + ts{id:123}
... + ts{externalId:'abc'}
... + ts{space:'my-space',externalId:'my-ts-xid'}
... '''
>>> dps = client.time_series.data.synthetic.query(
... expressions="ts{id:123} + ts{externalId:'abc'}",
... expressions=expression,
... start="2w-ago",
... end="now")
Use variables to re-use an expression:
You can also specify variables for an easier query syntax:
>>> from cognite.client.data_classes.data_modeling.ids import NodeId
>>> ts = client.time_series.retrieve(id=123)
>>> variables = {"A": ts, "B": "my_ts_external_id"}
>>> variables = {
... "A": ts,
... "B": "my_ts_external_id",
... "C": NodeId("my-space", "my-ts-xid"),
... }
>>> dps = client.time_series.data.synthetic.query(
... expressions="A+B", start="2w-ago", end="now", variables=variables)
... expressions="A+B+C", start="2w-ago", end="now", variables=variables)
Use sympy to build complex expressions:
Expand All @@ -116,6 +130,7 @@ def query(
user_expr, variables, aggregate, granularity, target_unit, target_unit_system
)
query = {"expression": expression, "start": timestamp_to_ms(start), "end": timestamp_to_ms(end)}
# NOTE / TODO: We misuse the 'external_id' field for the entire 'expression string':
query_datapoints = Datapoints(external_id=short_expression, value=[], error=[])
tasks.append((query, query_datapoints, limit))

Expand All @@ -142,7 +157,7 @@ def _fetch_datapoints(self, query: dict[str, Any], datapoints: Datapoints, limit
def _build_expression(
self,
expression: str | sympy.Basic,
variables: dict[str | sympy.Symbol, str | TimeSeries | TimeSeriesWrite] | None = None,
variables: dict[str | sympy.Symbol, str | NodeId | TimeSeries | TimeSeriesWrite] | None = None,
aggregate: str | None = None,
granularity: str | None = None,
target_unit: str | None = None,
Expand Down Expand Up @@ -181,11 +196,25 @@ def _build_expression(
to_substitute = {}
for k, v in variables.items():
if isinstance(v, TimeSeriesCore):
if v.external_id is None:
raise ValueError(f"TimeSeries passed in 'variables' is missing required field 'external_id' ({v})")
v = v.external_id
try:
v = Identifier.load(external_id=v.external_id, instance_id=v.instance_id).as_primitive()
except ValueError:
# ^error message wrongly says id is accepted, which it is not
raise ValueError(
f"TimeSeries passed in 'variables' is missing required field 'external_id' or 'instance_id' ({v})"
) from None

if isinstance(v, str):
sub_string = f"externalId:'{v}'"
elif isinstance(v, InstanceId):
sub_string = f"space:'{v.space}',externalId:'{v.external_id}'"
else:
raise TypeError(
f"Unsupported variable type={type(v)} in 'variables', must be str, NodeId or TimeSeries object"
)

# We convert to str to ensure any sympy.Symbol is replaced with its name:
to_substitute[re.escape(str(k))] = f"ts{{externalId:'{v}'{aggregate_str}{target_unit_str}}}"
to_substitute[re.escape(str(k))] = "ts{" + sub_string + aggregate_str + target_unit_str + "}"

# Substitute all variables in one go to avoid substitution of prior substitutions:
pattern = re.compile(r"\b" + r"\b|\b".join(to_substitute) + r"\b") # note: \b marks a word boundary
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.68.0"
__version__ = "7.69.0"
__api_subversion__ = "20230101"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.68.0"
version = "7.69.0"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
25 changes: 25 additions & 0 deletions tests/tests_integration/test_api/test_synthetic_time_series.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import random
import re
from datetime import datetime, timezone
from unittest import mock

import pytest

from cognite.client.data_classes import Datapoints, DatapointsList
from cognite.client.data_classes.data_modeling.ids import NodeId


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -82,6 +84,29 @@ def test_query_using_time_series_objs__missing_external_id(self, cognite_client,
variables={"A": test_time_series[0], "B": whoopsie_ts},
)

def test_query_using_instance_ids(self, cognite_client):
node_id = NodeId("PySDK-DMS-time-series-integration-test", "PYSDK integration test 126: clone of 114")
ext_id = "PYSDK integration test 114: 1mill dps, random distribution, 1950-2020, numeric"
ts_with_ext_id, ts_with_instance_id = cognite_client.time_series.retrieve_multiple(
external_ids=ext_id, instance_ids=node_id
)
n_dps = random.choice(range(100, 1000))
res = cognite_client.time_series.data.synthetic.query(
expressions="(A / B) * (C / D) - 1", # should yield zeros only
variables={
"A": node_id, # NodeId
"B": ts_with_ext_id, # TimeSeries using external_id
"C": ts_with_instance_id, # TimeSeries using instance_id
"D": ext_id, # str (external ID)
},
start=random.choice(range(1483228800000)), # start between 1970 and 2017
end="now",
limit=n_dps,
)
assert len(res) == n_dps
assert all(err is None for err in res.error)
assert all(x == 0.0 for x in res.value) # float, plz

@pytest.mark.dsl
def test_expression_builder_time_series_vs_string(self, cognite_client, test_time_series):
from sympy import symbols
Expand Down

0 comments on commit 297b56e

Please sign in to comment.