Skip to content

Commit

Permalink
Add method to check if subscription is alive (#1362)
Browse files Browse the repository at this point in the history
  • Loading branch information
erlendvollset authored Oct 10, 2023
1 parent b5e677f commit 5c3ba44
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cognite/client/_api/data_modeling/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def _do_subscribe() -> None:
thread_name = f"instances-sync-subscriber-{random_string(10)}"
thread = Thread(target=_do_subscribe, name=thread_name, daemon=True)
thread.start()

subscription_context._thread = thread
return subscription_context

@classmethod
Expand Down
5 changes: 5 additions & 0 deletions cognite/client/data_classes/data_modeling/instances.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import json
import threading
from abc import abstractmethod
from collections import defaultdict
from dataclasses import dataclass
Expand Down Expand Up @@ -821,6 +822,10 @@ class SubscriptionContext:
last_successful_sync: datetime | None = None
last_successful_callback: datetime | None = None
_canceled: bool = False
_thread: threading.Thread | None = None

def cancel(self) -> None:
self._canceled = True

def is_alive(self) -> bool:
return self._thread is not None and self._thread.is_alive()
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ def wait_for_invocation_count_to_have_value(value: int) -> None:
while State.invocation_count != value and (abs(time.time() - start_time) > max_wait_seconds):
time.sleep(0.1)

cognite_client.data_modeling.instances.subscribe(my_query, callback)
context = cognite_client.data_modeling.instances.subscribe(my_query, callback, poll_delay_seconds=2)

try:
cognite_client.data_modeling.instances.apply(nodes=new_1994_movie)
Expand All @@ -657,3 +657,14 @@ def wait_for_invocation_count_to_have_value(value: int) -> None:
wait_for_invocation_count_to_have_value(2)
finally:
cognite_client.data_modeling.instances.delete(new_1994_movie.as_id())

assert context.is_alive()
context.cancel()
# May take some time for the thread to actually die, so we check in a loop for a little while
for i in range(10):
if context.is_alive():
time.sleep(1)
else:
return
else:
assert not context.is_alive()

0 comments on commit 5c3ba44

Please sign in to comment.