Skip to content

Commit

Permalink
Fix a race condition resulting in broken state stores (#325)
Browse files Browse the repository at this point in the history
Daemon threads do not block the program from exiting. This means that if
the state store was in the middle of syncing while the extractor shut
down, it would stop writing mid-way through leaving a broken json file.

Turning of daemon mode if a cancellation token was provided should fix
this issue. I also noticed we actually never set the cancellation token
for any state store when using the base class, so I fixed that too.
  • Loading branch information
mathialo authored May 8, 2024
1 parent d743ddb commit 5f5681e
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 8 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## 7.1.4

### Fixed

* Fixed a race condition in state stores and uploaders where a shutdown could result in corrupted state stores.

## 7.1.3

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "7.1.3"
__version__ = "7.1.4"
from .base import Extractor
12 changes: 10 additions & 2 deletions cognite/extractorutils/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,17 @@ def recursive_find_state_store(d: Dict[str, Any]) -> Optional[StateStoreConfig]:

state_store_config = recursive_find_state_store(self.config.__dict__)
if state_store_config:
self.state_store = state_store_config.create_state_store(self.cognite_client, self.use_default_state_store)
self.state_store = state_store_config.create_state_store(
cdf_client=self.cognite_client,
default_to_local=self.use_default_state_store,
cancellation_token=self.cancellation_token,
)
else:
self.state_store = LocalStateStore("states.json") if self.use_default_state_store else NoStateStore()
self.state_store = (
LocalStateStore("states.json", cancellation_token=self.cancellation_token)
if self.use_default_state_store
else NoStateStore()
)

try:
self.state_store.initialize()
Expand Down
9 changes: 7 additions & 2 deletions cognite/extractorutils/configtools/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,10 @@ class StateStoreConfig:
local: Optional[LocalStateStoreConfig] = None

def create_state_store(
self, cdf_client: Optional[CogniteClient] = None, default_to_local: bool = True
self,
cdf_client: Optional[CogniteClient] = None,
default_to_local: bool = True,
cancellation_token: Optional[CancellationToken] = None,
) -> AbstractStateStore:
"""
Create a state store object based on the config.
Expand All @@ -648,15 +651,17 @@ def create_state_store(
database=self.raw.database,
table=self.raw.table,
save_interval=self.raw.upload_interval.seconds,
cancellation_token=cancellation_token,
)

if self.local:
return LocalStateStore(
file_path=self.local.path,
save_interval=self.local.save_interval.seconds,
cancellation_token=cancellation_token,
)

if default_to_local:
return LocalStateStore(file_path="states.json")
return LocalStateStore(file_path="states.json", cancellation_token=cancellation_token)
else:
return NoStateStore()
2 changes: 1 addition & 1 deletion cognite/extractorutils/statestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def __init__(

self.logger = logging.getLogger(__name__)

self.thread = threading.Thread(target=self._run, daemon=True, name=thread_name)
self.thread = threading.Thread(target=self._run, daemon=cancellation_token is None, name=thread_name)
self.lock = threading.RLock()
self.cancellation_token = cancellation_token.create_child_token() if cancellation_token else CancellationToken()

Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/uploader/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
self.trigger_log_level = _resolve_log_level(trigger_log_level)
self.logger = logging.getLogger(__name__)

self.thread = threading.Thread(target=self._run, daemon=True, name=thread_name)
self.thread = threading.Thread(target=self._run, daemon=cancellation_token is None, name=thread_name)
self.lock = threading.RLock()
self.cancellation_token: CancellationToken = (
cancellation_token.create_child_token() if cancellation_token else CancellationToken()
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-extractor-utils"
version = "7.1.3"
version = "7.1.4"
description = "Utilities for easier development of extractors for CDF"
authors = ["Mathias Lohne <[email protected]>"]
license = "Apache-2.0"
Expand Down
6 changes: 6 additions & 0 deletions tests/tests_unit/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def test_load_state_store(get_client_mock):
e2._load_state_store()
assert isinstance(e2.state_store, LocalStateStore)

# Make sure the state store have been given a child token
assert e2.state_store.cancellation_token._parent is e2.cancellation_token

e3 = Extractor(
name="my_extractor3",
description="description",
Expand All @@ -76,6 +79,9 @@ def test_load_state_store(get_client_mock):
e3._load_state_store()
assert isinstance(e3.state_store, LocalStateStore)

# Make sure the state store have been given a child token
assert e3.state_store.cancellation_token._parent is e3.cancellation_token

e6 = Extractor(
name="my_extractor6",
description="description",
Expand Down

0 comments on commit 5f5681e

Please sign in to comment.