Skip to content

Commit

Permalink
Merge pull request #2344 from ganga-devs/runMonitoring_fix
Browse files Browse the repository at this point in the history
Re-add runMonitoring
  • Loading branch information
mesmith75 authored Jun 10, 2024
2 parents dbe7f8a + 9ee763f commit a676195
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
31 changes: 30 additions & 1 deletion ganga/GangaCore/Core/MonitoringComponent/MonitoringService.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def _check_active_backends(self, job_slice=None):
log.debug("RegistryLockError: The job was most likely removed")
log.debug("Reg LockError%s" % str(err))

if job_slice and len(found_active_backends)==0:
log.debug("No active backends found with a job slice. Turning off the monitoring loop")
self.enabled = False
return

# If a backend is newly found as active, trigger its monitoring
previously_active_backends = self.active_backends
self.active_backends = found_active_backends
Expand All @@ -84,7 +89,7 @@ def _check_active_backends(self, job_slice=None):
if previously_active_backends:
self._cleanup_finished_backends(previously_active_backends, found_active_backends)

self.loop.call_later(POLL_RATE, self._check_active_backends)
self.loop.call_later(POLL_RATE, self._check_active_backends, job_slice)

def _log_backend_summary(self, active_backends):
summary = "{"
Expand Down Expand Up @@ -226,3 +231,27 @@ def stop(self):
log.error(err)
self._cleanup_scheduled_tasks()
self.loop.call_soon_threadsafe(self.loop.stop)

def runMonitoring(self, jobs=None):
"""
Enable/Run the monitoring loop and wait for the monitoring steps completion.
Parameters:
steps: number of monitoring steps to run
timeout: how long to wait for monitor steps termination (seconds)
jobs: a registry slice to be monitored (None -> all jobs)
Return:
False, if the loop cannot be started or the timeout occured while waiting for monitoring termination
True, if the monitoring steps were successfully executed
Note:
This method is meant to be used in Ganga scripts to request monitoring on demand.
"""
log.debug("runMonitoring")
if not self.alive:
log.error("Cannot run the monitoring loop. It has already been stopped")
return False

self.enabled = True
self.thread_executor = ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE)
self._cleanup_dirty_jobs()
self.loop.call_soon_threadsafe(functools.partial(self._check_active_backends, jobs))
return True
2 changes: 1 addition & 1 deletion ganga/GangaCore/Core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ def bootstrap(reg_slice, interactive_session, my_interface=None):
import GangaCore.GPI
my_interface = GangaCore.GPI

exportToInterface(my_interface, 'runMonitoring', monitoring_component.enable, 'Functions')
exportToInterface(my_interface, 'runMonitoring', monitoring_component.runMonitoring, 'Functions')

0 comments on commit a676195

Please sign in to comment.