diff --git a/cognite/extractorutils/statestore.py b/cognite/extractorutils/statestore.py index 242e9eb8..5a8f9838 100644 --- a/cognite/extractorutils/statestore.py +++ b/cognite/extractorutils/statestore.py @@ -138,11 +138,13 @@ def __init__( self._deleted: List[str] = [] - def start(self) -> None: + def start(self, initialize: bool = True) -> None: """ Start saving state periodically if save_interval is set. This calls the synchronize method every save_interval seconds. """ + if initialize and not self._initialized: + self.initialize() if self.save_interval is not None: self.thread.start() @@ -416,13 +418,13 @@ def impl() -> None: """ Upload local state store to CDF """ - self._cdf_client.raw.rows.insert(db_name=self.database, table_name=self.table, row=self._local_state) - # Create a copy of deleted to facilitate testing (mock library stores list, and as it changes, the - # assertions fail) - self._cdf_client.raw.rows.delete( - db_name=self.database, table_name=self.table, key=[k for k in self._deleted] - ) with self.lock: + self._cdf_client.raw.rows.insert(db_name=self.database, table_name=self.table, row=self._local_state) + # Create a copy of deleted to facilitate testing (mock library stores list, and as it changes, the + # assertions fail) + self._cdf_client.raw.rows.delete( + db_name=self.database, table_name=self.table, key=[k for k in self._deleted] + ) self._deleted.clear() impl() @@ -501,10 +503,9 @@ def synchronize(self) -> None: """ Save states to specified JSON file """ - with open(self._file_path, "w") as f: - json.dump(self._local_state, f, cls=_DecimalEncoder) - with self.lock: + with open(self._file_path, "w") as f: + json.dump(self._local_state, f, cls=_DecimalEncoder) self._deleted.clear() def __enter__(self) -> "LocalStateStore":