Skip to content

Commit

Permalink
MNT #14 refactor to avoid common Sphinx situation
Browse files Browse the repository at this point in the history
  • Loading branch information
prjemian committed Nov 29, 2024
1 parent 7cf2d5a commit 5da5fc7
Showing 1 changed file with 24 additions and 27 deletions.
51 changes: 24 additions & 27 deletions src/instrument/utils/stored_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import collections.abc
import datetime
import inspect
import json
import logging
import pathlib
Expand All @@ -31,18 +32,6 @@
logger.bsdev(__file__)


# (Internal) Dictionary of active ``StoredDict`` ``sync_agent`` threads.
#
# A single entry for each instance of ``StoredDict``.
# - Key is the ``id`` of an instance of ``StoredDict``.
# - Value is the ``threading`` object (if running) or ``None``.
# Keeping the ``threading`` object **outside** the ``StoredDict`` class
# avoids an error from ``copy.deepcopy(md)`` that happens during Bluesky runs::
#
# TypeError: cannot pickle '_thread.lock' object
_sync_threads = {}


class StoredDict(collections.abc.MutableMapping):
"""
Dictionary that syncs to storage.
Expand Down Expand Up @@ -87,12 +76,9 @@ def __init__(self, file, delay=5, title=None, serializable=True):
self._title = title or f"Written by {self.__class__.__name__}."
self.test_serializable = serializable

self._sync_key = id(self)
self._sync_key = f"sync_agent_{id(self):x}"
self._sync_deadline = time.time()
self._sync_while_loop_period = 0.005
self._sync_thread_abort = False
# Need to keep the thread objects outside of the class.
_sync_threads[self._sync_key] = None

self._cache = {}
self.reload()
Expand All @@ -119,14 +105,21 @@ def __repr__(self):

def __setitem__(self, key, value):
"""Write to the dictionary."""
outermost_frame = inspect.getouterframes(inspect.currentframe())[-1]
if "sphinx-build" in outermost_frame.filename:
# Seems that Sphinx is building the documentation.
# Ignore all the objects it tries to add.
return

if self.test_serializable:
json.dumps({key: value})
self._cache[key] = value

self._cache[key] = value # Store the new (or revised) content.

# Reset the deadline.
self._sync_deadline = time.time() + self._delay
logger.debug("new sync deadline in %f s.", self._delay)
if _sync_threads[self._sync_key] is None:
if not self._sync_agent_running:
self._delayed_sync_to_storage()

def _delayed_sync_to_storage(self):
Expand All @@ -140,26 +133,30 @@ def _delayed_sync_to_storage(self):
def sync_agent():
"""Threaded task."""
logger.debug("Starting sync_agent...")
while time.time() < self._sync_deadline and not self._sync_thread_abort:
while time.time() < self._sync_deadline:
time.sleep(self._sync_while_loop_period)
logger.debug("Sync waiting period ended")

StoredDict.dump(self._file, self._cache, title=self._title)

_sync_threads[self._sync_key] = None
self._sync_thread_abort = False
thred = threading.Thread(target=sync_agent)
thred.start()

_sync_threads[self._sync_key] = threading.Thread(target=sync_agent)
_sync_threads[self._sync_key].start()
@property
def _sync_agent_running(self):
"""Is our sync_agent thread running?"""
verdict = False
for thred in threading.enumerate():
if thred.name == self._sync_key:
verdict = True
return verdict

def flush(self):
"""Force a write of the dictionary to disk"""
logger.debug("flush()")
if _sync_threads[self._sync_key] is None:
if not self._sync_agent_running:
StoredDict.dump(self._file, self._cache, title=self._title)
# TODO: stop the thread
else:
self._sync_thread_abort = True
self._sync_deadline = time.time()

def popitem(self):
"""
Expand Down

0 comments on commit 5da5fc7

Please sign in to comment.