Skip to content
This repository has been archived by the owner on Jul 19, 2021. It is now read-only.

Commit

Permalink
Refactoring to ease introduction of EnsembleEvaluator
Browse files Browse the repository at this point in the history
Accomplishes this mainly by
    - Moving methods rightfully belonging to the JobQueue out of
      JobQueueManager and EnkfSimulationRunner
    - Introducing add_ee_stage on the JobQueue which is capable of
      adding Ensemble stages to the queue
  • Loading branch information
jondequinor committed Nov 27, 2020
1 parent c31b2b5 commit 976792c
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 126 deletions.
2 changes: 1 addition & 1 deletion python/job_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def _setup_reporters(is_interactive_run, ee_id):
if is_interactive_run:
reporters.append(reporting.Interactive())
elif ee_id:
reporters.append(reporting.File())
reporters.append(reporting.File(sync_disc_timeout=0))
reporters.append(reporting.Network())
reporters.append(reporting.Event())
else:
Expand Down
11 changes: 6 additions & 5 deletions python/job_runner/reporting/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ class File(object):
OK_file = "OK"
STATUS_json = "status.json"

def __init__(self):
def __init__(self, sync_disc_timeout=10):
self.status_dict = {}
self.node = socket.gethostname()
self._sync_disc_timeout = sync_disc_timeout

def report(self, msg, sync_disc_timeout=10):
def report(self, msg):
job_status = {}
if msg.job:
index = msg.job.index
Expand Down Expand Up @@ -81,7 +82,7 @@ def report(self, msg, sync_disc_timeout=10):
self.status_dict["end_time"] = data_util.datetime_serialize(
msg.timestamp
)
self._dump_ok_file(sync_disc_timeout)
self._dump_ok_file()
else:
# this has already been handled by earlier event
pass
Expand Down Expand Up @@ -172,15 +173,15 @@ def _dump_error_file(self, job, error_msg):
fileH.write("</error>\n")
fileH.close()

def _dump_ok_file(self, sync_disc_timeout):
def _dump_ok_file(self):
now = time.localtime()
with open(self.OK_file, "w") as f:
f.write(
"All jobs complete {:02d}:{:02d}:{:02d} \n".format(
now.tm_hour, now.tm_min, now.tm_sec
)
)
time.sleep(sync_disc_timeout) # Let the disks sync up
time.sleep(self._sync_disc_timeout) # Let the disks sync up

def _dump_status_json(self):
with open(self.STATUS_json, "w") as fp:
Expand Down
61 changes: 11 additions & 50 deletions python/res/enkf/enkf_simulation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import time
import threading

LONG_RUNNING_FACTOR = 1.25


class EnkfSimulationRunner(BaseCClass):
TYPE_NAME = "enkf_simulation_runner"
Expand Down Expand Up @@ -94,43 +92,26 @@ def runWorkflows(runtime, ert):
hook_manager = ert.getHookManager()
hook_manager.runWorkflows(runtime, ert)

def add_job(self, run_arg, res_config, job_queue, max_runtime):
job_name = run_arg.job_name
run_path = run_arg.runpath
job_script = res_config.queue_config.job_script
num_cpu = res_config.queue_config.num_cpu
if num_cpu == 0:
num_cpu = res_config.ecl_config.num_cpu

job = JobQueueNode(
job_script=job_script,
job_name=job_name,
run_path=run_path,
num_cpu=num_cpu,
status_file=job_queue.status_file,
ok_file=job_queue.ok_file,
exit_file=job_queue.exit_file,
done_callback_function=EnKFState.forward_model_ok_callback,
exit_callback_function=EnKFState.forward_model_exit_callback,
callback_arguments=[run_arg, res_config],
max_runtime=max_runtime,
)

if job is None:
return
run_arg._set_queue_index(job_queue.add_job(job))

def start_queue(self, run_context, job_queue):
max_runtime = self._enkf_main().analysisConfig().get_max_runtime()
if max_runtime == 0:
max_runtime = None

done_callback_function = EnKFState.forward_model_ok_callback
exit_callback_function = EnKFState.forward_model_exit_callback

# submit jobs
for i in range(len(run_context)):
if not run_context.is_active(i):
continue
run_arg = run_context[i]
self.add_job(run_arg, self._enkf_main().resConfig(), job_queue, max_runtime)
job_queue.add_job_from_run_arg(
run_arg,
self._enkf_main().resConfig(),
max_runtime,
done_callback_function,
exit_callback_function,
)

job_queue.submit_complete()
queue_evaluators = None
Expand All @@ -140,30 +121,10 @@ def start_queue(self, run_context, job_queue):
):
queue_evaluators = [
partial(
EnkfSimulationRunner.stop_long_running_jobs,
job_queue,
job_queue.stop_long_running_jobs,
self._enkf_main().analysisConfig().minimum_required_realizations,
)
]

jqm = JobQueueManager(job_queue, queue_evaluators)
jqm.execute_queue()

@staticmethod
def stop_long_running_jobs(job_queue, minimum_required_realizations):
finished_realizations = job_queue.count_status(JobStatusType.JOB_QUEUE_DONE)
if finished_realizations < minimum_required_realizations:
return

completed_jobs = [
job
for job in job_queue.job_list
if job.status == JobStatusType.JOB_QUEUE_DONE
]
average_runtime = sum([job.runtime for job in completed_jobs]) / float(
len(completed_jobs)
)

for job in job_queue.job_list:
if job.runtime > LONG_RUNNING_FACTOR * average_runtime:
job.stop()
1 change: 0 additions & 1 deletion python/res/enkf/enkf_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from cwrap import BaseCClass
from res import ResPrototype
from res.enkf.enums import EnkfInitModeEnum, EnkfVarType
from res.job_queue import JobStatusType


class EnKFState(BaseCClass):
Expand Down
54 changes: 4 additions & 50 deletions python/res/job_queue/job_queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
Module implementing a queue for managing external jobs.
"""
from res.job_queue import Job, JobStatusType, ThreadStatus
from res.job_queue import JobStatusType
from threading import BoundedSemaphore
import time

CONCURRENT_INTERNALIZATION = 10


# TODO: there's no need for this class, all the behavior belongs in the queue
# class proper.
class JobQueueManager:
def __init__(self, queue, queue_evaluators=None):
self._queue = queue
Expand Down Expand Up @@ -89,52 +90,5 @@ def __repr__(self):
status = "waiting=%d, running=%d, success=%d, failed=%d" % (nw, nr, ns, nf)
return "JobQueueManager(%s, %s)" % (status, ir)

def max_running(self):
if self.queue.get_max_running() == 0:
return len(self.queue.job_list)
else:
return self.queue.get_max_running()

def _available_capacity(self):
return (
not self.queue.stopped and self.queue.count_running() < self.max_running()
)

def _launch_jobs(self):
# Start waiting jobs
while self._available_capacity():
job = self.queue.fetch_next_waiting()
if job is None:
break
job.run(
driver=self.queue.driver,
pool_sema=self._pool_sema,
max_submit=self.queue.max_submit,
)

def _stop_jobs(self):
for job in self.queue.job_list:
job.stop()
while self.queue.is_active():
time.sleep(1)

def _assert_complete(self):
for job in self.queue.job_list:
if job.thread_status != ThreadStatus.DONE:
msg = "Unexpected job status type after running job: {} with thread status: {}"
raise AssertionError(msg.format(job.status, job.thread_status))

def execute_queue(self):
while self.queue.is_active() and not self.queue.stopped:
self._launch_jobs()

time.sleep(1)

if self._queue_evaluators is not None:
for func in self._queue_evaluators:
func()

if self.queue.stopped:
self._stop_jobs()

self._assert_complete()
self._queue.execute_queue(self._pool_sema, self._queue_evaluators)
120 changes: 119 additions & 1 deletion python/res/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@

from __future__ import absolute_import, division, print_function, unicode_literals

import asyncio
import sys
import time
import ctypes

from cwrap import BaseCClass

from res import ResPrototype
from res.job_queue import Job, JobStatusType, ThreadStatus
from res.job_queue import Job, JobStatusType, ThreadStatus, JobQueueNode

LONG_RUNNING_FACTOR = 1.25


class JobQueue(BaseCClass):
Expand Down Expand Up @@ -309,3 +312,118 @@ def add_job(self, job):

def count_running(self):
return sum(job.thread_status == ThreadStatus.RUNNING for job in self.job_list)

def max_running(self):
if self.get_max_running() == 0:
return len(self.job_list)
else:
return self.get_max_running()

def available_capacity(self):
return not self.stopped and self.count_running() < self.max_running()

def stop_jobs(self):
for job in self.job_list:
job.stop()
while self.is_active():
time.sleep(1)

async def stop_jobs_async(self):
for job in self.job_list:
job.stop()
while self.is_active():
await asyncio.sleep(1)

def assert_complete(self):
for job in self.job_list:
if job.thread_status != ThreadStatus.DONE:
msg = "Unexpected job status type after running job: {} with thread status: {}"
raise AssertionError(msg.format(job.status, job.thread_status))

def launch_jobs(self, pool_sema):
# Start waiting jobs
while self.available_capacity():
job = self.fetch_next_waiting()
if job is None:
break
job.run(
driver=self.driver,
pool_sema=pool_sema,
max_submit=self.max_submit,
)

def execute_queue(self, pool_sema, evaluators):
while self.is_active() and not self.stopped:
self.launch_jobs(pool_sema)

time.sleep(1)

if evaluators is not None:
for func in evaluators:
func()

if self.stopped:
self.stop_jobs()

self.assert_complete()

def add_job_from_run_arg(self, run_arg, res_config, max_runtime, ok_cb, exit_cb):
job_name = run_arg.job_name
run_path = run_arg.runpath
job_script = res_config.queue_config.job_script
num_cpu = res_config.queue_config.num_cpu
if num_cpu == 0:
num_cpu = res_config.ecl_config.num_cpu

job = JobQueueNode(
job_script=job_script,
job_name=job_name,
run_path=run_path,
num_cpu=num_cpu,
status_file=self.status_file,
ok_file=self.ok_file,
exit_file=self.exit_file,
done_callback_function=ok_cb,
exit_callback_function=exit_cb,
callback_arguments=[run_arg, res_config],
max_runtime=max_runtime,
)

if job is None:
return
run_arg._set_queue_index(self.add_job(job))

def add_ee_stage(self, stage):
job = JobQueueNode(
job_script=stage.get_job_script(),
job_name=stage.get_job_name(),
run_path=stage.get_run_path(),
num_cpu=stage.get_num_cpu(),
status_file=self.status_file,
ok_file=self.ok_file,
exit_file=self.exit_file,
done_callback_function=stage.get_done_callback(),
exit_callback_function=stage.get_exit_callback(),
callback_arguments=stage.get_callback_arguments(),
max_runtime=stage.get_max_runtime(),
)
if job is None:
raise ValueError("JobQueueNode constructor created None job")

stage.get_run_arg()._set_queue_index(self.add_job(job))

def stop_long_running_jobs(self, minimum_required_realizations):
finished_realizations = self.count_status(JobStatusType.JOB_QUEUE_DONE)
if finished_realizations < minimum_required_realizations:
return

completed_jobs = [
job for job in self.job_list if job.status == JobStatusType.JOB_QUEUE_DONE
]
average_runtime = sum([job.runtime for job in completed_jobs]) / float(
len(completed_jobs)
)

for job in self.job_list:
if job.runtime > LONG_RUNNING_FACTOR * average_runtime:
job.stop()
4 changes: 2 additions & 2 deletions python/tests/job_runner/test_file_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class FileReporterTests(TestCase):
def setUp(self):
self.reporter = File()
self.reporter = File(sync_disc_timeout=0)

@tmpdir(None)
def test_report_with_init_message_argument(self):
Expand Down Expand Up @@ -177,7 +177,7 @@ def test_report_with_successful_finish_message_argument(self):
msg.timestamp, 0, []
)

self.reporter.report(msg, sync_disc_timeout=0)
self.reporter.report(msg)

with open(self.reporter.OK_file, "r") as f:
self.assertIn(
Expand Down
Loading

0 comments on commit 976792c

Please sign in to comment.