diff --git a/python/job_runner/cli.py b/python/job_runner/cli.py index f361c3a68e..612f0ad026 100644 --- a/python/job_runner/cli.py +++ b/python/job_runner/cli.py @@ -15,7 +15,7 @@ def _setup_reporters(is_interactive_run, ee_id): if is_interactive_run: reporters.append(reporting.Interactive()) elif ee_id: - reporters.append(reporting.File()) + reporters.append(reporting.File(sync_disc_timeout=0)) reporters.append(reporting.Network()) reporters.append(reporting.Event()) else: diff --git a/python/job_runner/reporting/file.py b/python/job_runner/reporting/file.py index f6890a7297..33fa097f7a 100644 --- a/python/job_runner/reporting/file.py +++ b/python/job_runner/reporting/file.py @@ -24,11 +24,12 @@ class File(object): OK_file = "OK" STATUS_json = "status.json" - def __init__(self): + def __init__(self, sync_disc_timeout=10): self.status_dict = {} self.node = socket.gethostname() + self._sync_disc_timeout = sync_disc_timeout - def report(self, msg, sync_disc_timeout=10): + def report(self, msg): job_status = {} if msg.job: index = msg.job.index @@ -81,7 +82,7 @@ def report(self, msg, sync_disc_timeout=10): self.status_dict["end_time"] = data_util.datetime_serialize( msg.timestamp ) - self._dump_ok_file(sync_disc_timeout) + self._dump_ok_file() else: # this has already been handled by earlier event pass @@ -172,7 +173,7 @@ def _dump_error_file(self, job, error_msg): fileH.write("\n") fileH.close() - def _dump_ok_file(self, sync_disc_timeout): + def _dump_ok_file(self): now = time.localtime() with open(self.OK_file, "w") as f: f.write( @@ -180,7 +181,7 @@ def _dump_ok_file(self, sync_disc_timeout): now.tm_hour, now.tm_min, now.tm_sec ) ) - time.sleep(sync_disc_timeout) # Let the disks sync up + time.sleep(self._sync_disc_timeout) # Let the disks sync up def _dump_status_json(self): with open(self.STATUS_json, "w") as fp: diff --git a/python/res/enkf/enkf_simulation_runner.py b/python/res/enkf/enkf_simulation_runner.py index 60dde0059a..19f45f7e5d 100644 --- a/python/res/enkf/enkf_simulation_runner.py +++ b/python/res/enkf/enkf_simulation_runner.py @@ -12,8 +12,6 @@ import time import threading -LONG_RUNNING_FACTOR = 1.25 - class EnkfSimulationRunner(BaseCClass): TYPE_NAME = "enkf_simulation_runner" @@ -94,43 +92,26 @@ def runWorkflows(runtime, ert): hook_manager = ert.getHookManager() hook_manager.runWorkflows(runtime, ert) - def add_job(self, run_arg, res_config, job_queue, max_runtime): - job_name = run_arg.job_name - run_path = run_arg.runpath - job_script = res_config.queue_config.job_script - num_cpu = res_config.queue_config.num_cpu - if num_cpu == 0: - num_cpu = res_config.ecl_config.num_cpu - - job = JobQueueNode( - job_script=job_script, - job_name=job_name, - run_path=run_path, - num_cpu=num_cpu, - status_file=job_queue.status_file, - ok_file=job_queue.ok_file, - exit_file=job_queue.exit_file, - done_callback_function=EnKFState.forward_model_ok_callback, - exit_callback_function=EnKFState.forward_model_exit_callback, - callback_arguments=[run_arg, res_config], - max_runtime=max_runtime, - ) - - if job is None: - return - run_arg._set_queue_index(job_queue.add_job(job)) - def start_queue(self, run_context, job_queue): max_runtime = self._enkf_main().analysisConfig().get_max_runtime() if max_runtime == 0: max_runtime = None + done_callback_function = EnKFState.forward_model_ok_callback + exit_callback_function = EnKFState.forward_model_exit_callback + # submit jobs for i in range(len(run_context)): if not run_context.is_active(i): continue run_arg = run_context[i] - self.add_job(run_arg, self._enkf_main().resConfig(), job_queue, max_runtime) + job_queue.add_job_from_run_arg( + run_arg, + self._enkf_main().resConfig(), + max_runtime, + done_callback_function, + exit_callback_function, + ) job_queue.submit_complete() queue_evaluators = None @@ -140,30 +121,10 @@ def start_queue(self, run_context, job_queue): ): queue_evaluators = [ partial( - EnkfSimulationRunner.stop_long_running_jobs, - job_queue, + job_queue.stop_long_running_jobs, self._enkf_main().analysisConfig().minimum_required_realizations, ) ] jqm = JobQueueManager(job_queue, queue_evaluators) jqm.execute_queue() - - @staticmethod - def stop_long_running_jobs(job_queue, minimum_required_realizations): - finished_realizations = job_queue.count_status(JobStatusType.JOB_QUEUE_DONE) - if finished_realizations < minimum_required_realizations: - return - - completed_jobs = [ - job - for job in job_queue.job_list - if job.status == JobStatusType.JOB_QUEUE_DONE - ] - average_runtime = sum([job.runtime for job in completed_jobs]) / float( - len(completed_jobs) - ) - - for job in job_queue.job_list: - if job.runtime > LONG_RUNNING_FACTOR * average_runtime: - job.stop() diff --git a/python/res/enkf/enkf_state.py b/python/res/enkf/enkf_state.py index cc98f4fe4e..b86bb5c365 100644 --- a/python/res/enkf/enkf_state.py +++ b/python/res/enkf/enkf_state.py @@ -16,7 +16,6 @@ from cwrap import BaseCClass from res import ResPrototype from res.enkf.enums import EnkfInitModeEnum, EnkfVarType -from res.job_queue import JobStatusType class EnKFState(BaseCClass): diff --git a/python/res/job_queue/job_queue_manager.py b/python/res/job_queue/job_queue_manager.py index 143f406647..8b70ea35fb 100644 --- a/python/res/job_queue/job_queue_manager.py +++ b/python/res/job_queue/job_queue_manager.py @@ -17,13 +17,14 @@ Module implementing a queue for managing external jobs. """ -from res.job_queue import Job, JobStatusType, ThreadStatus +from res.job_queue import JobStatusType from threading import BoundedSemaphore -import time CONCURRENT_INTERNALIZATION = 10 +# TODO: there's no need for this class, all the behavior belongs in the queue +# class proper. class JobQueueManager: def __init__(self, queue, queue_evaluators=None): self._queue = queue @@ -89,52 +90,5 @@ def __repr__(self): status = "waiting=%d, running=%d, success=%d, failed=%d" % (nw, nr, ns, nf) return "JobQueueManager(%s, %s)" % (status, ir) - def max_running(self): - if self.queue.get_max_running() == 0: - return len(self.queue.job_list) - else: - return self.queue.get_max_running() - - def _available_capacity(self): - return ( - not self.queue.stopped and self.queue.count_running() < self.max_running() - ) - - def _launch_jobs(self): - # Start waiting jobs - while self._available_capacity(): - job = self.queue.fetch_next_waiting() - if job is None: - break - job.run( - driver=self.queue.driver, - pool_sema=self._pool_sema, - max_submit=self.queue.max_submit, - ) - - def _stop_jobs(self): - for job in self.queue.job_list: - job.stop() - while self.queue.is_active(): - time.sleep(1) - - def _assert_complete(self): - for job in self.queue.job_list: - if job.thread_status != ThreadStatus.DONE: - msg = "Unexpected job status type after running job: {} with thread status: {}" - raise AssertionError(msg.format(job.status, job.thread_status)) - def execute_queue(self): - while self.queue.is_active() and not self.queue.stopped: - self._launch_jobs() - - time.sleep(1) - - if self._queue_evaluators is not None: - for func in self._queue_evaluators: - func() - - if self.queue.stopped: - self._stop_jobs() - - self._assert_complete() + self._queue.execute_queue(self._pool_sema, self._queue_evaluators) diff --git a/python/res/job_queue/queue.py b/python/res/job_queue/queue.py index 62badf018a..d7f64b4c3b 100644 --- a/python/res/job_queue/queue.py +++ b/python/res/job_queue/queue.py @@ -20,6 +20,7 @@ from __future__ import absolute_import, division, print_function, unicode_literals +import asyncio import sys import time import ctypes @@ -27,7 +28,9 @@ from cwrap import BaseCClass from res import ResPrototype -from res.job_queue import Job, JobStatusType, ThreadStatus +from res.job_queue import Job, JobStatusType, ThreadStatus, JobQueueNode + +LONG_RUNNING_FACTOR = 1.25 class JobQueue(BaseCClass): @@ -309,3 +312,118 @@ def add_job(self, job): def count_running(self): return sum(job.thread_status == ThreadStatus.RUNNING for job in self.job_list) + + def max_running(self): + if self.get_max_running() == 0: + return len(self.job_list) + else: + return self.get_max_running() + + def available_capacity(self): + return not self.stopped and self.count_running() < self.max_running() + + def stop_jobs(self): + for job in self.job_list: + job.stop() + while self.is_active(): + time.sleep(1) + + async def stop_jobs_async(self): + for job in self.job_list: + job.stop() + while self.is_active(): + await asyncio.sleep(1) + + def assert_complete(self): + for job in self.job_list: + if job.thread_status != ThreadStatus.DONE: + msg = "Unexpected job status type after running job: {} with thread status: {}" + raise AssertionError(msg.format(job.status, job.thread_status)) + + def launch_jobs(self, pool_sema): + # Start waiting jobs + while self.available_capacity(): + job = self.fetch_next_waiting() + if job is None: + break + job.run( + driver=self.driver, + pool_sema=pool_sema, + max_submit=self.max_submit, + ) + + def execute_queue(self, pool_sema, evaluators): + while self.is_active() and not self.stopped: + self.launch_jobs(pool_sema) + + time.sleep(1) + + if evaluators is not None: + for func in evaluators: + func() + + if self.stopped: + self.stop_jobs() + + self.assert_complete() + + def add_job_from_run_arg(self, run_arg, res_config, max_runtime, ok_cb, exit_cb): + job_name = run_arg.job_name + run_path = run_arg.runpath + job_script = res_config.queue_config.job_script + num_cpu = res_config.queue_config.num_cpu + if num_cpu == 0: + num_cpu = res_config.ecl_config.num_cpu + + job = JobQueueNode( + job_script=job_script, + job_name=job_name, + run_path=run_path, + num_cpu=num_cpu, + status_file=self.status_file, + ok_file=self.ok_file, + exit_file=self.exit_file, + done_callback_function=ok_cb, + exit_callback_function=exit_cb, + callback_arguments=[run_arg, res_config], + max_runtime=max_runtime, + ) + + if job is None: + return + run_arg._set_queue_index(self.add_job(job)) + + def add_ee_stage(self, stage): + job = JobQueueNode( + job_script=stage.get_job_script(), + job_name=stage.get_job_name(), + run_path=stage.get_run_path(), + num_cpu=stage.get_num_cpu(), + status_file=self.status_file, + ok_file=self.ok_file, + exit_file=self.exit_file, + done_callback_function=stage.get_done_callback(), + exit_callback_function=stage.get_exit_callback(), + callback_arguments=stage.get_callback_arguments(), + max_runtime=stage.get_max_runtime(), + ) + if job is None: + raise ValueError("JobQueueNode constructor created None job") + + stage.get_run_arg()._set_queue_index(self.add_job(job)) + + def stop_long_running_jobs(self, minimum_required_realizations): + finished_realizations = self.count_status(JobStatusType.JOB_QUEUE_DONE) + if finished_realizations < minimum_required_realizations: + return + + completed_jobs = [ + job for job in self.job_list if job.status == JobStatusType.JOB_QUEUE_DONE + ] + average_runtime = sum([job.runtime for job in completed_jobs]) / float( + len(completed_jobs) + ) + + for job in self.job_list: + if job.runtime > LONG_RUNNING_FACTOR * average_runtime: + job.stop() diff --git a/python/tests/job_runner/test_file_reporter.py b/python/tests/job_runner/test_file_reporter.py index 65cc434ae4..de51303c57 100644 --- a/python/tests/job_runner/test_file_reporter.py +++ b/python/tests/job_runner/test_file_reporter.py @@ -10,7 +10,7 @@ class FileReporterTests(TestCase): def setUp(self): - self.reporter = File() + self.reporter = File(sync_disc_timeout=0) @tmpdir(None) def test_report_with_init_message_argument(self): @@ -177,7 +177,7 @@ def test_report_with_successful_finish_message_argument(self): msg.timestamp, 0, [] ) - self.reporter.report(msg, sync_disc_timeout=0) + self.reporter.report(msg) with open(self.reporter.OK_file, "r") as f: self.assertIn( diff --git a/python/tests/res/enkf/test_enkf_simulation_runner.py b/python/tests/res/enkf/test_enkf_simulation_runner.py index 5b22cb96de..c211424f76 100644 --- a/python/tests/res/enkf/test_enkf_simulation_runner.py +++ b/python/tests/res/enkf/test_enkf_simulation_runner.py @@ -1,17 +1,5 @@ -from res.enkf import EnkfSimulationRunner -from res.job_queue import JobQueueNode, JobStatusType, JobQueue, JobQueueManager -from res.job_queue import ThreadStatus, Driver, QueueDriverEnum -from tests.utils import tmpdir -from unittest import TestCase -import os, stat, random - - -class MockedQueue: - def __init__(self, job_list): - self.job_list = job_list - - def count_status(self, status): - return len([x for x in self.job_list if x.status == status]) +from res.job_queue import JobStatusType, JobQueue +from unittest import TestCase, mock class MockedJob: @@ -27,6 +15,9 @@ def runtime(self): def stop(self): self.status = JobStatusType.JOB_QUEUE_FAILED + def convertToCReference(self, _): + pass + class EnkfSimulationRunnerTest(TestCase): def test_stop_long_running(self): @@ -52,9 +43,18 @@ def test_stop_long_running(self): job_list[i]._start_time = 0 job_list[i]._end_time = 5 - queue = MockedQueue(job_list) + # The driver is of no consequence, so resolving it in the c layer is + # uninteresting and mocked out. + with mock.patch("res.job_queue.JobQueue._set_driver"): + queue = JobQueue(mock.MagicMock()) + + # We don't need the c layer call here, we only need it added to + # the queue's job_list. + with mock.patch("res.job_queue.JobQueue._add_job"): + for job in job_list: + queue.add_job(job) - EnkfSimulationRunner.stop_long_running_jobs(queue, 5) + queue.stop_long_running_jobs(5) for i in range(5): assert job_list[i].status == JobStatusType.JOB_QUEUE_DONE