diff --git a/cognite/client/_api/datapoints.py b/cognite/client/_api/datapoints.py index d12a8c467c..2ed2e43ce0 100644 --- a/cognite/client/_api/datapoints.py +++ b/cognite/client/_api/datapoints.py @@ -1250,7 +1250,6 @@ def insert( Your datapoints can be a list of tuples where the first element is the timestamp and the second element is the value:: - >>> from cognite.client import CogniteClient >>> from datetime import datetime, timezone >>> c = CogniteClient() diff --git a/tests/tests_integration/test_api/test_datapoint_subscriptions.py b/tests/tests_integration/test_api/test_datapoint_subscriptions.py index 51b1879f2d..2e98a8d4b6 100644 --- a/tests/tests_integration/test_api/test_datapoint_subscriptions.py +++ b/tests/tests_integration/test_api/test_datapoint_subscriptions.py @@ -1,7 +1,11 @@ from __future__ import annotations +import random import time +from contextlib import contextmanager +from datetime import datetime +import numpy as np import pandas as pd import pytest @@ -13,34 +17,50 @@ DatapointSubscriptionFilterProperties, DataPointSubscriptionUpdate, ) +from cognite.client.utils._text import random_string TIMESERIES_EXTERNAL_IDS = [f"PYSDK DataPoint Subscription Test {no}" for no in range(10)] +@contextmanager +def create_subscription_with_cleanup( + client: CogniteClient, sub_to_create: DataPointSubscriptionCreate +) -> DatapointSubscription: + sub = None + try: + yield (sub := client.time_series.subscriptions.create(sub_to_create)) + finally: + if sub: + client.time_series.subscriptions.delete(external_id=sub.external_id, ignore_unknown_ids=True) + + @pytest.fixture(scope="session") -def time_series_external_ids(cognite_client: CogniteClient) -> list[str]: - existing = cognite_client.time_series.retrieve_multiple( +def all_time_series_external_ids(cognite_client: CogniteClient) -> list[str]: + existing_xids = cognite_client.time_series.retrieve_multiple( external_ids=TIMESERIES_EXTERNAL_IDS, ignore_unknown_ids=True - ) - if len(existing) == len(TIMESERIES_EXTERNAL_IDS): - return [ts.external_id for ts in existing] - upserted = cognite_client.time_series.upsert( + ).as_external_ids() + + if len(existing_xids) == len(TIMESERIES_EXTERNAL_IDS): + return existing_xids + + return cognite_client.time_series.upsert( [ - TimeSeries( - external_id=external_id, - name=external_id, - is_string=False, - ) + TimeSeries(external_id=external_id, name=external_id, is_string=False) for external_id in TIMESERIES_EXTERNAL_IDS ], mode="overwrite", - ) + ).as_external_ids() - return [ts.external_id for ts in upserted] + +@pytest.fixture +def time_series_external_ids(all_time_series_external_ids): + # Spread the load to avoid API errors like 'a ts can't be part of too many subscriptions': + ts_xids = all_time_series_external_ids[:] + return random.sample(ts_xids, k=4) @pytest.fixture(scope="session") -def subscription(cognite_client: CogniteClient, time_series_external_ids: list[str]) -> DatapointSubscription: +def subscription(cognite_client: CogniteClient, all_time_series_external_ids: list[str]) -> DatapointSubscription: external_id = "PYSDKDataPointSubscriptionTest" sub = cognite_client.time_series.subscriptions.retrieve(external_id, ignore_unknown_ids=True) if sub is not None: @@ -48,7 +68,7 @@ def subscription(cognite_client: CogniteClient, time_series_external_ids: list[s new_sub = DataPointSubscriptionCreate( external_id=external_id, name=f"{external_id}_3ts", - time_series_ids=time_series_external_ids[:3], + time_series_ids=all_time_series_external_ids[:3], partition_count=1, ) return cognite_client.time_series.subscriptions.create(new_sub) @@ -65,25 +85,19 @@ def test_create_retrieve_delete_subscription( ): # Arrange new_subscription = DataPointSubscriptionCreate( - external_id="PYSDKDataPointSubscriptionCreateRetrieveDeleteTest", + external_id=f"PYSDKDataPointSubscriptionCreateRetrieveDeleteTest-{random_string(10)}", name="PYSDKDataPointSubscriptionCreateRetrieveDeleteTest", time_series_ids=time_series_external_ids, partition_count=1, ) - created_subscription: DatapointSubscription | None = None - - # Act - try: - created_subscription = cognite_client.time_series.subscriptions.create(new_subscription) + with create_subscription_with_cleanup(cognite_client, new_subscription) as created: retrieved_subscription = cognite_client.time_series.subscriptions.retrieve(new_subscription.external_id) # Assert - assert created_subscription.created_time - assert created_subscription.last_updated_time - assert created_subscription.time_series_count == len(new_subscription.time_series_ids) - assert ( - retrieved_subscription.external_id == new_subscription.external_id == created_subscription.external_id - ) + assert created.created_time + assert created.last_updated_time + assert created.time_series_count == len(new_subscription.time_series_ids) + assert retrieved_subscription.external_id == new_subscription.external_id == created.external_id # Act cognite_client.time_series.subscriptions.delete(new_subscription.external_id) @@ -93,22 +107,16 @@ def test_create_retrieve_delete_subscription( # Assert assert retrieved_deleted is None - finally: - if created_subscription: - cognite_client.time_series.subscriptions.delete(new_subscription.external_id, ignore_unknown_ids=True) def test_update_subscription(self, cognite_client: CogniteClient, time_series_external_ids: list[str]): # Arrange new_subscription = DataPointSubscriptionCreate( - external_id="PYSDKDataPointSubscriptionUpdateTest", + external_id=f"PYSDKDataPointSubscriptionUpdateTest-{random_string(10)}", name="PYSDKDataPointSubscriptionUpdateTest", time_series_ids=time_series_external_ids, partition_count=1, ) - created: DatapointSubscription | None = None - try: - created = cognite_client.time_series.subscriptions.create(new_subscription) - + with create_subscription_with_cleanup(cognite_client, new_subscription): update = ( DataPointSubscriptionUpdate(new_subscription.external_id) .name.set("New Name") @@ -121,9 +129,6 @@ def test_update_subscription(self, cognite_client: CogniteClient, time_series_ex # Assert assert updated.name == "New Name" assert updated.time_series_count == len(time_series_external_ids) - 1 - finally: - if created: - cognite_client.time_series.subscriptions.delete(new_subscription.external_id, ignore_unknown_ids=True) def test_update_filter_defined_subscription(self, cognite_client: CogniteClient): # Arrange @@ -132,15 +137,12 @@ def test_update_filter_defined_subscription(self, cognite_client: CogniteClient) numerical_timeseries = f.And(f.Equals(p.is_string, False)) new_subscription = DataPointSubscriptionCreate( - external_id="PYSDKFilterDefinedDataPointSubscriptionUpdateTest", + external_id=f"PYSDKFilterDefinedDataPointSubscriptionUpdateTest-{random_string(10)}", name="PYSDKFilterDefinedDataPointSubscriptionUpdateTest", filter=numerical_timeseries, partition_count=1, ) - created: DatapointSubscription | None = None - try: - created = cognite_client.time_series.subscriptions.create(new_subscription) - + with create_subscription_with_cleanup(cognite_client, new_subscription): new_filter = f.And( f.Equals(p.is_string, False), f.Prefix(p.external_id, "PYSDK DataPoint Subscription Test") ) @@ -153,65 +155,56 @@ def test_update_filter_defined_subscription(self, cognite_client: CogniteClient) # Assert assert retrieved.filter.dump() == new_filter.dump() - finally: - if created: - cognite_client.time_series.subscriptions.delete(new_subscription.external_id, ignore_unknown_ids=True) def test_iterate_data_subscription_initial_call( self, cognite_client: CogniteClient, time_series_external_ids: list[str] ): # Arrange new_subscription = DataPointSubscriptionCreate( - external_id="PYSDKDataPointSubscriptionListDataTest", + external_id=f"PYSDKDataPointSubscriptionListDataTest-{random_string(10)}", name="PYSDKDataPointSubscriptionListDataTest", time_series_ids=time_series_external_ids, partition_count=1, ) - try: - cognite_client.time_series.subscriptions.create(new_subscription) - subscription = cognite_client.time_series.subscriptions.iterate_data(new_subscription.external_id) - - batch = next(subscription) + with create_subscription_with_cleanup(cognite_client, new_subscription): + subscription_changes = cognite_client.time_series.subscriptions.iterate_data(new_subscription.external_id) + batch = next(subscription_changes) assert ( len(batch.subscription_changes.added) > 0 ), "The subscription used for testing datapoint subscriptions must have at least one time series" - batch = next(subscription) + batch = next(subscription_changes) assert ( len(batch.subscription_changes.added) == 0 ), "There should be no more timeseries in the subsequent batches" - finally: - cognite_client.time_series.subscriptions.delete(new_subscription.external_id, ignore_unknown_ids=True) def test_iterate_data_subscription_changed_time_series( self, cognite_client: CogniteClient, time_series_external_ids: list[str] ): # Arrange + first_ts, second_ts = time_series_external_ids[:2] new_subscription = DataPointSubscriptionCreate( - external_id="PYSDKDataPointSubscriptionChangedTimeSeriesTest", + external_id=f"PYSDKDataPointSubscriptionChangedTimeSeriesTest-{random_string(10)}", name="PYSDKDataPointSubscriptionChangedTimeSeriesTest", - time_series_ids=[time_series_external_ids[0]], + time_series_ids=[first_ts], partition_count=1, ) - created: DatapointSubscription | None = None - try: - created = cognite_client.time_series.subscriptions.create(new_subscription) - + with create_subscription_with_cleanup(cognite_client, new_subscription): # Act - subscription = cognite_client.time_series.subscriptions.iterate_data(new_subscription.external_id) - batch = next(subscription) + subscription_changes = cognite_client.time_series.subscriptions.iterate_data(new_subscription.external_id) + batch = next(subscription_changes) # Assert - assert batch.subscription_changes.added[0].external_id == time_series_external_ids[0] + assert batch.subscription_changes.added[0].external_id == first_ts assert len(batch.updates) == 0 # Arrange update = ( DataPointSubscriptionUpdate(new_subscription.external_id) - .time_series_ids.add([time_series_external_ids[1]]) - .time_series_ids.remove([time_series_external_ids[0]]) + .time_series_ids.add([second_ts]) + .time_series_ids.remove([first_ts]) ) # Act @@ -219,30 +212,21 @@ def test_iterate_data_subscription_changed_time_series( batch = next(cognite_client.time_series.subscriptions.iterate_data(new_subscription.external_id)) # Assert - assert {a.external_id for a in batch.subscription_changes.added} == { - time_series_external_ids[1], - time_series_external_ids[0], - } - assert {a.external_id for a in batch.subscription_changes.removed} == {time_series_external_ids[0]} + assert {a.external_id for a in batch.subscription_changes.added} == {second_ts, first_ts} + assert {a.external_id for a in batch.subscription_changes.removed} == {first_ts} assert len(batch.updates) == 0 - finally: - if created: - cognite_client.time_series.subscriptions.delete(new_subscription.external_id, ignore_unknown_ids=True) def test_iterate_data_subscription_datapoints_added( self, cognite_client: CogniteClient, time_series_external_ids: list[str] ): # Arrange new_subscription = DataPointSubscriptionCreate( - external_id="PYSDKDataPointSubscriptionChangedTimeSeriesTest", + external_id=f"PYSDKDataPointSubscriptionChangedTimeSeriesTest-{random_string(10)}", name="PYSDKDataPointSubscriptionChangedTimeSeriesTest", time_series_ids=[time_series_external_ids[0]], partition_count=1, ) - created: DatapointSubscription | None = None - new_data: pd.DataFrame | None = None - try: - created = cognite_client.time_series.subscriptions.create(new_subscription) + with create_subscription_with_cleanup(cognite_client, new_subscription) as created: assert created.created_time # Act @@ -253,35 +237,34 @@ def test_iterate_data_subscription_datapoints_added( # Arrange existing_data = cognite_client.time_series.data.retrieve_dataframe(external_id=time_series_external_ids[0]) + new_values = [42, 43] new_data = pd.DataFrame( + {time_series_external_ids[0]: new_values}, index=pd.date_range(start=existing_data.index[-1] + pd.Timedelta("1d"), periods=2, freq="1d"), - data=[[42], [43]], - columns=[time_series_external_ids[0]], ) + new_timestamps = new_data.index.asi8 // 10**6 - # Act - cognite_client.time_series.data.insert_dataframe(new_data) - batch = next(cognite_client.time_series.subscriptions.iterate_data(new_subscription.external_id)) + try: + # Act + cognite_client.time_series.data.insert_dataframe(new_data) + batch = next(cognite_client.time_series.subscriptions.iterate_data(new_subscription.external_id)) - # Assert - assert batch.updates - assert ( - sum( - abs(actual.value - expected) - for actual, expected in zip(batch.updates[0].upserts, new_data[time_series_external_ids[0]].values) + # Assert + assert batch.updates + np.testing.assert_allclose( + new_values, + [dp.value for dp in batch.updates[0].upserts], + err_msg="The values of the retrieved data should be the same as the inserted data (to float precision)", ) - < 1e-6 - ), "The values of the retrieved data should be the same as the inserted data" - assert all( - (actual.timestamp == expected) - for actual, expected in zip(batch.updates[0].upserts, new_data.index.astype("int64") // 10**6) - ), "The timestamps of the retrieved data should be the same as the inserted data" - finally: - if created: - cognite_client.time_series.subscriptions.delete(new_subscription.external_id, ignore_unknown_ids=True) - if new_data is not None: + np.testing.assert_equal( + new_timestamps, + [dp.timestamp for dp in batch.updates[0].upserts], + err_msg="The timestamps of the retrieved data should be exactly equal to the inserted ones", + ) + + finally: cognite_client.time_series.data.delete_range( - new_data.index[0], new_data.index[-1] + pd.Timedelta("1d"), external_id=time_series_external_ids[0] + new_timestamps[0], new_timestamps[-1] + 1, external_id=time_series_external_ids[0] ) def test_iterate_data_subscription_jump_to_end( @@ -289,15 +272,12 @@ def test_iterate_data_subscription_jump_to_end( ): # Arrange new_subscription = DataPointSubscriptionCreate( - external_id="PYSDKDataPointSubscriptionJumpToEndTest", + external_id=f"PYSDKDataPointSubscriptionJumpToEndTest-{random_string(10)}", name="PYSDKDataPointSubscriptionJumpToEndTest", time_series_ids=time_series_external_ids, partition_count=1, ) - - created: DatapointSubscription | None = None - try: - created = cognite_client.time_series.subscriptions.create(new_subscription) + with create_subscription_with_cleanup(cognite_client, new_subscription) as created: assert created.created_time # Act @@ -311,10 +291,6 @@ def test_iterate_data_subscription_jump_to_end( if not batch.has_next: break - finally: - if created: - cognite_client.time_series.subscriptions.delete(new_subscription.external_id, ignore_unknown_ids=True) - def test_iterate_data_subscription_start_1m_ago( self, cognite_client: CogniteClient, subscription: DatapointSubscription ): @@ -325,7 +301,7 @@ def test_iterate_data_subscription_start_1m_ago( break assert added_last_minute == 0, "There should be no timeseries added in the last minute" - @pytest.mark.skip(reason="This test is flaky") + @pytest.mark.skip(reason="Using a filter (as opposed to specific identifiers) is eventually consistent") def test_update_filter_subscription_added_times_series( self, cognite_client: CogniteClient, time_series_external_ids: list[str] ): @@ -337,16 +313,13 @@ def test_update_filter_subscription_added_times_series( ) new_subscription = DataPointSubscriptionCreate( - external_id="PYSDKDataPointSubscriptionUpdateFilterTest", + external_id=f"PYSDKDataPointSubscriptionUpdateFilterTest-{random_string(10)}", name="PYSDKDataPointSubscriptionUpdateFilterTest", filter=numerical_timeseries, partition_count=1, ) - - created: DatapointSubscription | None = None created_timeseries: TimeSeries | None = None - try: - created = cognite_client.time_series.subscriptions.create(new_subscription) + with create_subscription_with_cleanup(cognite_client, new_subscription) as created: assert created.created_time # Act @@ -361,30 +334,30 @@ def test_update_filter_subscription_added_times_series( # Arrange new_numerical_timeseries = TimeSeries( - external_id="PYSDK DataPoint Subscription Test 42", + external_id=f"PYSDK DataPoint Subscription Test 42 ({random_string(10)})", name="PYSDK DataPoint Subscription Test 42", is_string=False, ) - created_timeseries = cognite_client.time_series.create(new_numerical_timeseries) - cognite_client.time_series.data.insert_dataframe( - pd.DataFrame(index=[pd.Timestamp.now()], data=[[42]], columns=[new_numerical_timeseries.external_id]) - ) - # Ensure that the subscription has been updated - time.sleep(10) + try: + created_timeseries = cognite_client.time_series.create(new_numerical_timeseries) + cognite_client.time_series.data.insert( + [(datetime.now(), 42)], external_id=new_numerical_timeseries.external_id + ) + # Ensure that the subscription has been updated + time.sleep(10) - # Act - updated_added_count = 0 - for batch in cognite_client.time_series.subscriptions.iterate_data(new_subscription.external_id): - updated_added_count += len(batch.subscription_changes.added) - if not batch.has_next: - break + # Act + updated_added_count = 0 + for batch in cognite_client.time_series.subscriptions.iterate_data(new_subscription.external_id): + updated_added_count += len(batch.subscription_changes.added) + if not batch.has_next: + break - # Assert - assert ( - initial_added_count + 1 == updated_added_count - ), "The new timeseries should be added. Note this test is flaky and may fail." - finally: - if created: - cognite_client.time_series.subscriptions.delete(new_subscription.external_id, ignore_unknown_ids=True) - if created_timeseries: - cognite_client.time_series.delete(created_timeseries.id) + # Assert + assert initial_added_count + 1 == updated_added_count, ( + "The new timeseries should be added. This is most likely because using a filter with " + "datapoint subscriptions is eventually consistent." + ) + finally: + if created_timeseries: + cognite_client.time_series.delete(created_timeseries.id)