From df12688db9baf6b51568fefb5f6957b337ba25a7 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Fri, 21 Jul 2023 15:34:30 -0700 Subject: [PATCH 1/3] Make shared memory array name an instance attribute --- difi/metrics.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/difi/metrics.py b/difi/metrics.py index cb052f5..b56103d 100644 --- a/difi/metrics.py +++ b/difi/metrics.py @@ -1,5 +1,6 @@ import hashlib import multiprocessing as mp +import os import warnings from abc import ABC, abstractmethod from itertools import combinations, repeat @@ -556,7 +557,10 @@ def _store_as_shared_record_array(self, object_ids, obs_ids, times, ra, dec, nig self._num_observations = observations_array.shape[0] self._itemsize = observations_array.itemsize - shared_mem = shared_memory.SharedMemory("DIFI_ARRAY", create=True, size=observations_array.nbytes) + self._shared_memory_name = f"DIFI_ARRAY_{os.getpid()}" + shared_mem = shared_memory.SharedMemory( + self._shared_memory_name, create=True, size=observations_array.nbytes + ) shared_memory_array = np.ndarray( observations_array.shape, dtype=observations_array.dtype, buffer=shared_mem.buf ) @@ -569,9 +573,8 @@ def _store_as_shared_record_array(self, object_ids, obs_ids, times, ra, dec, nig shared_mem.close() return - @staticmethod - def _clear_shared_record_array(): - shared_mem = shared_memory.SharedMemory("DIFI_ARRAY") + def _clear_shared_record_array(self): + shared_mem = shared_memory.SharedMemory(self._shared_memory_name) shared_mem.unlink() def _run_object_worker( @@ -626,7 +629,7 @@ def _run_object_worker( ] # Load the observations from shared memory - existing_shared_mem = shared_memory.SharedMemory(name="DIFI_ARRAY") + existing_shared_mem = shared_memory.SharedMemory(name=self._shared_memory_name) observations = np.ndarray( num_obs, dtype=self._dtypes, @@ -832,7 +835,7 @@ def _run_window_worker( ] # Read observations from shared memory array - existing_shared_mem = shared_memory.SharedMemory(name="DIFI_ARRAY") + existing_shared_mem = shared_memory.SharedMemory(name=self._shared_memory_name) observations = np.ndarray( num_obs, dtype=self._dtypes, From 59334cd737fa9313f748027f132d2ff3ea28101a Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Fri, 21 Jul 2023 15:38:12 -0700 Subject: [PATCH 2/3] Add clear_on_failure to FindabilityMetrics.run (clears shared memory array on failure) --- difi/metrics.py | 144 ++++++++++++++++++++++++++++-------------------- 1 file changed, 85 insertions(+), 59 deletions(-) diff --git a/difi/metrics.py b/difi/metrics.py index b56103d..86aa8b1 100644 --- a/difi/metrics.py +++ b/difi/metrics.py @@ -692,6 +692,7 @@ def run_by_object( discovery_probability: float = 1.0, ignore_after_discovery: bool = False, num_jobs: Optional[int] = 1, + clear_on_failure: bool = True, ) -> List[pd.DataFrame]: """ Run the findability metric on the observations split by objects. For windows where there are many @@ -721,6 +722,9 @@ def run_by_object( num_jobs : int, optional The number of jobs to run in parallel. If 1, then run in serial. If None, then use the number of CPUs on the machine. + clear_on_failure : bool, optional + If a failure occurs and this is False, then the shared memory array will not be cleared. + If True, then the shared memory array will be cleared. Returns ------- @@ -741,39 +745,45 @@ def run_by_object( # Split arrays by object split_by_object_slices = self._split_by_object(objects) - # Store the observations in a global variable so that the worker functions can access them - self._store_as_shared_record_array(objects, obs_ids, times, ra, dec, nights) - - findable_lists: List[List[Dict[str, Any]]] = [] - if num_jobs is None or num_jobs > 1: - pool = mp.Pool(num_jobs) - findable_lists = pool.starmap( - self._run_object_worker, - zip( - split_by_object_slices, - repeat(windows), - repeat(discovery_opportunities), - repeat(discovery_probability), - repeat(ignore_after_discovery), - ), - ) + try: + # Store the observations in a global variable so that the worker functions can access them + self._store_as_shared_record_array(objects, obs_ids, times, ra, dec, nights) + + findable_lists: List[List[Dict[str, Any]]] = [] + if num_jobs is None or num_jobs > 1: + pool = mp.Pool(num_jobs) + findable_lists = pool.starmap( + self._run_object_worker, + zip( + split_by_object_slices, + repeat(windows), + repeat(discovery_opportunities), + repeat(discovery_probability), + repeat(ignore_after_discovery), + ), + ) - pool.close() - pool.join() + pool.close() + pool.join() - else: - for object_indices in split_by_object_slices: - findable_lists.append( - self._run_object_worker( - object_indices, - windows, - discovery_opportunities=discovery_opportunities, - discovery_probability=discovery_probability, - ignore_after_discovery=ignore_after_discovery, + else: + for object_indices in split_by_object_slices: + findable_lists.append( + self._run_object_worker( + object_indices, + windows, + discovery_opportunities=discovery_opportunities, + discovery_probability=discovery_probability, + ignore_after_discovery=ignore_after_discovery, + ) ) - ) - self._clear_shared_record_array() + self._clear_shared_record_array() + + except Exception as e: + if clear_on_failure: + self._clear_shared_record_array() + raise e findable_flattened = [item for sublist in findable_lists for item in sublist] @@ -900,6 +910,7 @@ def run_by_window( discovery_opportunities: bool = False, discovery_probability: float = 1.0, num_jobs: Optional[int] = 1, + clear_on_failure: bool = True, ) -> List[pd.DataFrame]: """ Run the findability metric on the observations split by windows where each window will @@ -925,6 +936,9 @@ def run_by_window( num_jobs : int, optional The number of jobs to run in parallel. If 1, then run in serial. If None, then use the number of CPUs on the machine. + clear_on_failure : bool, optional + If a failure occurs and this is False, then the shared memory array will not be cleared. + If True, then the shared memory array will be cleared. Returns ------- @@ -942,39 +956,45 @@ def run_by_window( observations["night"].values, ) - # Store the observations in a global variable so that the worker functions can access them - self._store_as_shared_record_array(objects, obs_ids, times, ra, dec, nights) - - # Find indices that split the observations into windows - split_by_window_slices = self._split_by_window(windows, nights) - - findable_lists: List[List[Dict[str, Any]]] = [] - if num_jobs is None or num_jobs > 1: - pool = mp.Pool(num_jobs) - findable_lists = pool.starmap( - self._run_window_worker, - zip( - split_by_window_slices, - range(len(windows)), - repeat(discovery_opportunities), - repeat(discovery_probability), - ), - ) - pool.close() - pool.join() + try: + # Store the observations in a global variable so that the worker functions can access them + self._store_as_shared_record_array(objects, obs_ids, times, ra, dec, nights) + + # Find indices that split the observations into windows + split_by_window_slices = self._split_by_window(windows, nights) + + findable_lists: List[List[Dict[str, Any]]] = [] + if num_jobs is None or num_jobs > 1: + pool = mp.Pool(num_jobs) + findable_lists = pool.starmap( + self._run_window_worker, + zip( + split_by_window_slices, + range(len(windows)), + repeat(discovery_opportunities), + repeat(discovery_probability), + ), + ) + pool.close() + pool.join() - else: - for i, window_slice in enumerate(split_by_window_slices): - findable_lists.append( - self._run_window_worker( - window_slice, - i, - discovery_opportunities=discovery_opportunities, - discovery_probability=discovery_probability, + else: + for i, window_slice in enumerate(split_by_window_slices): + findable_lists.append( + self._run_window_worker( + window_slice, + i, + discovery_opportunities=discovery_opportunities, + discovery_probability=discovery_probability, + ) ) - ) - self._clear_shared_record_array() + self._clear_shared_record_array() + + except Exception as e: + if clear_on_failure: + self._clear_shared_record_array() + raise e findable_flattened = [item for sublist in findable_lists for item in sublist] @@ -998,6 +1018,7 @@ def run( by_object: bool = False, ignore_after_discovery: bool = False, num_jobs: Optional[int] = 1, + clear_on_failure: bool = True, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Run the findability metric on the observations. @@ -1039,6 +1060,9 @@ def run( num_jobs : int, optional The number of jobs to run in parallel. If 1, then run in serial. If None, then use the number of CPUs on the machine. + clear_on_failure : bool, optional + If a failure occurs and this is False, then the shared memory array will not be cleared. + If True, then the shared memory array will be cleared. Returns ------- @@ -1070,6 +1094,7 @@ def run( discovery_probability=discovery_probability, ignore_after_discovery=ignore_after_discovery, num_jobs=num_jobs, + clear_on_failure=clear_on_failure, ) else: findable = self.run_by_window( @@ -1078,6 +1103,7 @@ def run( discovery_opportunities=discovery_opportunities, discovery_probability=discovery_probability, num_jobs=num_jobs, + clear_on_failure=clear_on_failure, ) window_summary = self._create_window_summary(observations, windows, findable) From 220db8f2edcff5a3faf0f2e3a76649316df11f34 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Wed, 26 Jul 2023 07:57:20 -0700 Subject: [PATCH 3/3] Add test of shared memory record array storing and clearing --- difi/metrics.py | 10 ++++++++++ difi/tests/test_metrics.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/difi/metrics.py b/difi/metrics.py index 86aa8b1..85c7fac 100644 --- a/difi/metrics.py +++ b/difi/metrics.py @@ -574,8 +574,18 @@ def _store_as_shared_record_array(self, object_ids, obs_ids, times, ra, dec, nig return def _clear_shared_record_array(self): + """ + Clears the shared memory array for this instance of the metric. + + Returns + ------- + None + """ shared_mem = shared_memory.SharedMemory(self._shared_memory_name) shared_mem.unlink() + self._shared_memory_name = None + self._num_observations = 0 + self._dtypes = None def _run_object_worker( self, diff --git a/difi/tests/test_metrics.py b/difi/tests/test_metrics.py index ab701ee..bcecfc1 100644 --- a/difi/tests/test_metrics.py +++ b/difi/tests/test_metrics.py @@ -1,3 +1,5 @@ +import os + import numpy as np import pytest @@ -486,3 +488,37 @@ def test_calcFindableMinObs_assertion(test_observations): with pytest.raises(AssertionError): metric = MinObsMetric() metric.determine_object_findable(test_observations) + + +def test_FindabilityMetrics_shared_memory(test_observations): + # Check that the function stores the observations in shared memory under + # the correct name + metric = MinObsMetric() + + # Extract the data from the test observations + object_ids = test_observations["object_id"].values + obs_ids = test_observations["obs_id"].values + time = test_observations["time"].values + ra = test_observations["ra"].values + dec = test_observations["dec"].values + night = test_observations["night"].values + + # Store the observations in shared memory + metric._store_as_shared_record_array(object_ids, obs_ids, time, ra, dec, night) + + # Check that the shared memory array has the correct name + assert metric._shared_memory_name == f"DIFI_ARRAY_{os.getpid()}" + assert metric._num_observations == len(test_observations) + assert metric._dtypes == [ + ("object_id", object_ids.dtype), + ("obs_id", obs_ids.dtype), + ("time", np.float64), + ("ra", np.float64), + ("dec", np.float64), + ("night", np.int64), + ] + + metric._clear_shared_record_array() + assert metric._shared_memory_name is None + assert metric._num_observations == 0 + assert metric._dtypes is None