From 5f5681e19b34e60f25ebdc041d2262a6a4b41265 Mon Sep 17 00:00:00 2001 From: Mathias Lohne Date: Wed, 8 May 2024 08:27:38 +0200 Subject: [PATCH] Fix a race condition resulting in broken state stores (#325) Daemon threads do not block the program from exiting. This means that if the state store was in the middle of syncing while the extractor shut down, it would stop writing mid-way through leaving a broken json file. Turning of daemon mode if a cancellation token was provided should fix this issue. I also noticed we actually never set the cancellation token for any state store when using the base class, so I fixed that too. --- CHANGELOG.md | 6 ++++++ cognite/extractorutils/__init__.py | 2 +- cognite/extractorutils/base.py | 12 ++++++++++-- cognite/extractorutils/configtools/elements.py | 9 +++++++-- cognite/extractorutils/statestore.py | 2 +- cognite/extractorutils/uploader/_base.py | 2 +- pyproject.toml | 2 +- tests/tests_unit/test_base.py | 6 ++++++ 8 files changed, 33 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index edea7047..0885d6e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,12 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## 7.1.4 + +### Fixed + + * Fixed a race condition in state stores and uploaders where a shutdown could result in corrupted state stores. + ## 7.1.3 ### Fixed diff --git a/cognite/extractorutils/__init__.py b/cognite/extractorutils/__init__.py index e9feb254..2f5daaa9 100644 --- a/cognite/extractorutils/__init__.py +++ b/cognite/extractorutils/__init__.py @@ -16,5 +16,5 @@ Cognite extractor utils is a Python package that simplifies the development of new extractors. """ -__version__ = "7.1.3" +__version__ = "7.1.4" from .base import Extractor diff --git a/cognite/extractorutils/base.py b/cognite/extractorutils/base.py index d2fb8eca..222dccd2 100644 --- a/cognite/extractorutils/base.py +++ b/cognite/extractorutils/base.py @@ -189,9 +189,17 @@ def recursive_find_state_store(d: Dict[str, Any]) -> Optional[StateStoreConfig]: state_store_config = recursive_find_state_store(self.config.__dict__) if state_store_config: - self.state_store = state_store_config.create_state_store(self.cognite_client, self.use_default_state_store) + self.state_store = state_store_config.create_state_store( + cdf_client=self.cognite_client, + default_to_local=self.use_default_state_store, + cancellation_token=self.cancellation_token, + ) else: - self.state_store = LocalStateStore("states.json") if self.use_default_state_store else NoStateStore() + self.state_store = ( + LocalStateStore("states.json", cancellation_token=self.cancellation_token) + if self.use_default_state_store + else NoStateStore() + ) try: self.state_store.initialize() diff --git a/cognite/extractorutils/configtools/elements.py b/cognite/extractorutils/configtools/elements.py index 06076551..810ea6f4 100644 --- a/cognite/extractorutils/configtools/elements.py +++ b/cognite/extractorutils/configtools/elements.py @@ -623,7 +623,10 @@ class StateStoreConfig: local: Optional[LocalStateStoreConfig] = None def create_state_store( - self, cdf_client: Optional[CogniteClient] = None, default_to_local: bool = True + self, + cdf_client: Optional[CogniteClient] = None, + default_to_local: bool = True, + cancellation_token: Optional[CancellationToken] = None, ) -> AbstractStateStore: """ Create a state store object based on the config. @@ -648,15 +651,17 @@ def create_state_store( database=self.raw.database, table=self.raw.table, save_interval=self.raw.upload_interval.seconds, + cancellation_token=cancellation_token, ) if self.local: return LocalStateStore( file_path=self.local.path, save_interval=self.local.save_interval.seconds, + cancellation_token=cancellation_token, ) if default_to_local: - return LocalStateStore(file_path="states.json") + return LocalStateStore(file_path="states.json", cancellation_token=cancellation_token) else: return NoStateStore() diff --git a/cognite/extractorutils/statestore.py b/cognite/extractorutils/statestore.py index b03be31d..b3e80dfa 100644 --- a/cognite/extractorutils/statestore.py +++ b/cognite/extractorutils/statestore.py @@ -132,7 +132,7 @@ def __init__( self.logger = logging.getLogger(__name__) - self.thread = threading.Thread(target=self._run, daemon=True, name=thread_name) + self.thread = threading.Thread(target=self._run, daemon=cancellation_token is None, name=thread_name) self.lock = threading.RLock() self.cancellation_token = cancellation_token.create_child_token() if cancellation_token else CancellationToken() diff --git a/cognite/extractorutils/uploader/_base.py b/cognite/extractorutils/uploader/_base.py index c2ea7c90..b5fb74fe 100644 --- a/cognite/extractorutils/uploader/_base.py +++ b/cognite/extractorutils/uploader/_base.py @@ -58,7 +58,7 @@ def __init__( self.trigger_log_level = _resolve_log_level(trigger_log_level) self.logger = logging.getLogger(__name__) - self.thread = threading.Thread(target=self._run, daemon=True, name=thread_name) + self.thread = threading.Thread(target=self._run, daemon=cancellation_token is None, name=thread_name) self.lock = threading.RLock() self.cancellation_token: CancellationToken = ( cancellation_token.create_child_token() if cancellation_token else CancellationToken() diff --git a/pyproject.toml b/pyproject.toml index 8662e81c..3028489b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cognite-extractor-utils" -version = "7.1.3" +version = "7.1.4" description = "Utilities for easier development of extractors for CDF" authors = ["Mathias Lohne "] license = "Apache-2.0" diff --git a/tests/tests_unit/test_base.py b/tests/tests_unit/test_base.py index 44f4dd15..f745153e 100644 --- a/tests/tests_unit/test_base.py +++ b/tests/tests_unit/test_base.py @@ -65,6 +65,9 @@ def test_load_state_store(get_client_mock): e2._load_state_store() assert isinstance(e2.state_store, LocalStateStore) + # Make sure the state store have been given a child token + assert e2.state_store.cancellation_token._parent is e2.cancellation_token + e3 = Extractor( name="my_extractor3", description="description", @@ -76,6 +79,9 @@ def test_load_state_store(get_client_mock): e3._load_state_store() assert isinstance(e3.state_store, LocalStateStore) + # Make sure the state store have been given a child token + assert e3.state_store.cancellation_token._parent is e3.cancellation_token + e6 = Extractor( name="my_extractor6", description="description",