Skip to content
This repository has been archived by the owner on Jul 19, 2021. It is now read-only.

Commit

Permalink
Solve ert #1385 by implementing differ on queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jondequinor committed Feb 11, 2021
1 parent 51e3e5b commit 951c2f5
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 18 deletions.
166 changes: 159 additions & 7 deletions python/res/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,53 @@
"""

import asyncio
import sys
import copy
import json
import logging
import time
import ctypes
import typing

import websockets
from cloudevents.http import CloudEvent, to_json
from cwrap import BaseCClass

from job_runner import JOBS_FILE
from res import ResPrototype
from res.job_queue import Job, JobStatusType, ThreadStatus, JobQueueNode
from res.job_queue import JobQueueNode, JobStatusType, ThreadStatus

logger = logging.getLogger(__name__)

LONG_RUNNING_FACTOR = 1.25

_FM_STAGE_WAITING = "com.equinor.ert.forward_model_stage.waiting"
_FM_STAGE_PENDING = "com.equinor.ert.forward_model_stage.pending"
_FM_STAGE_RUNNING = "com.equinor.ert.forward_model_stage.running"
_FM_STAGE_FAILURE = "com.equinor.ert.forward_model_stage.failure"
_FM_STAGE_SUCCESS = "com.equinor.ert.forward_model_stage.success"
_FM_STAGE_UNKNOWN = "com.equinor.ert.forward_model_stage.unknown"

_queue_state_to_event_type_map = {
"JOB_QUEUE_NOT_ACTIVE": _FM_STAGE_WAITING,
"JOB_QUEUE_WAITING": _FM_STAGE_WAITING,
"JOB_QUEUE_SUBMITTED": _FM_STAGE_WAITING,
"JOB_QUEUE_PENDING": _FM_STAGE_PENDING,
"JOB_QUEUE_RUNNING": _FM_STAGE_RUNNING,
"JOB_QUEUE_DONE": _FM_STAGE_RUNNING,
"JOB_QUEUE_EXIT": _FM_STAGE_RUNNING,
"JOB_QUEUE_IS_KILLED": _FM_STAGE_FAILURE,
"JOB_QUEUE_DO_KILL": _FM_STAGE_FAILURE,
"JOB_QUEUE_SUCCESS": _FM_STAGE_SUCCESS,
"JOB_QUEUE_RUNNING_DONE_CALLBACK": _FM_STAGE_RUNNING,
"JOB_QUEUE_RUNNING_EXIT_CALLBACK": _FM_STAGE_RUNNING,
"JOB_QUEUE_STATUS_FAILURE": _FM_STAGE_UNKNOWN,
"JOB_QUEUE_FAILED": _FM_STAGE_FAILURE,
"JOB_QUEUE_DO_KILL_NODE_FAILURE": _FM_STAGE_FAILURE,
"JOB_QUEUE_UNKNOWN": _FM_STAGE_UNKNOWN,
}


def _queue_state_event_type(state):
return _queue_state_to_event_type_map[state]


class JobQueue(BaseCClass):
# If the queue is created with size == 0 that means that it will
Expand Down Expand Up @@ -130,6 +166,8 @@ def __init__(self, driver, max_submit=2, size=0):

self.driver = driver
self._set_driver(driver.from_param(driver))
self._qindex_to_iens = {}
self._state = []

def kill_job(self, queue_index):
"""
Expand Down Expand Up @@ -302,10 +340,13 @@ def exit_file(self):
def status_file(self):
return self._get_status_file()

def add_job(self, job):
def add_job(self, job, iens):
job.convertToCReference(None)
queue_index = self._add_job(job)
self.job_list.append(job)
self._qindex_to_iens[queue_index] = iens
self._state.append(job.status.value)
assert len(self._state) - 1 == queue_index
return queue_index

def count_running(self):
Expand Down Expand Up @@ -359,11 +400,69 @@ def execute_queue(self, pool_sema, evaluators):
if evaluators is not None:
for func in evaluators:
func()
self._transition()

if self.stopped:
self.stop_jobs()

self.assert_complete()
self._transition()

@staticmethod
def _translate_change_to_cloudevent(real_id, status):
return CloudEvent(
{
"type": _queue_state_event_type(status),
"source": f"/ert/ee/{0}/real/{real_id}/stage/{0}",
"datacontenttype": "application/json",
},
{
"queue_event_type": status,
},
)

@staticmethod
async def _publish_changes(changes, websocket):
events = [
JobQueue._translate_change_to_cloudevent(real_id, status)
for real_id, status in changes.items()
]
for event in events:
await websocket.send(to_json(event))

async def execute_queue_async(self, ws_uri, pool_sema, evaluators):
async with websockets.connect(ws_uri) as websocket:
await JobQueue._publish_changes(self.snapshot(), websocket)

try:
while self.is_active() and not self.stopped:
self.launch_jobs(pool_sema)

await asyncio.sleep(1)

if evaluators is not None:
for func in evaluators:
func()

await JobQueue._publish_changes(
self._changes_after_transition(), websocket
)
except asyncio.CancelledError:
if self.stopped:
logger.debug(
"observed that the queue had stopped after cancellation, stopping jobs..."
)
self.stop_jobs()
logger.debug("jobs now stopped (after cancellation)")
raise

if self.stopped:
logger.debug("observed that the queue had stopped, stopping jobs...")
await self.stop_jobs_async()
logger.debug("jobs now stopped")
self.assert_complete()
self._transition()
await JobQueue._publish_changes(self.snapshot(), websocket)

def add_job_from_run_arg(self, run_arg, res_config, max_runtime, ok_cb, exit_cb):
job_name = run_arg.job_name
Expand All @@ -389,7 +488,7 @@ def add_job_from_run_arg(self, run_arg, res_config, max_runtime, ok_cb, exit_cb)

if job is None:
return
run_arg._set_queue_index(self.add_job(job))
run_arg._set_queue_index(self.add_job(job, run_arg.iens))

def add_ee_stage(self, stage):
job = JobQueueNode(
Expand All @@ -408,7 +507,8 @@ def add_ee_stage(self, stage):
if job is None:
raise ValueError("JobQueueNode constructor created None job")

stage.get_run_arg()._set_queue_index(self.add_job(job))
iens = stage.get_run_arg().iens
stage.get_run_arg()._set_queue_index(self.add_job(job, iens))

def stop_long_running_jobs(self, minimum_required_realizations):
finished_realizations = self.count_status(JobStatusType.JOB_QUEUE_DONE)
Expand All @@ -425,3 +525,55 @@ def stop_long_running_jobs(self, minimum_required_realizations):
for job in self.job_list:
if job.runtime > LONG_RUNNING_FACTOR * average_runtime:
job.stop()

def _transition(
self,
) -> typing.Tuple[typing.List[JobStatusType], typing.List[JobStatusType]]:
"""Transition to a new state, return both old and new state."""
new_state = [job.status.value for job in self.job_list]
old_state = copy.copy(self._state)
self._state = new_state
return old_state, new_state

def _diff_states(
self,
old_state: typing.List[JobStatusType],
new_state: typing.List[JobStatusType],
) -> typing.Dict[int, str]:
"""Return the diff between old_state and new_state."""
changes = {}

diff = list(map(lambda s: s[0] == s[1], zip(old_state, new_state)))
if len(diff) > 0:
for q_index, equal in enumerate(diff):
if not equal:
st = str(JobStatusType(new_state[q_index]))
changes[self._qindex_to_iens[q_index]] = st
return changes

def _changes_after_transition(self) -> typing.Dict[int, str]:
old_state, new_state = self._transition()
return self._diff_states(old_state, new_state)

def snapshot(self) -> typing.Optional[typing.Dict[int, str]]:
"""Return the whole state, or None if there was no snapshot."""
snapshot = {}
for q_index, state_val in enumerate(self._state):
st = str(JobStatusType(state_val))
try:
snapshot[self._qindex_to_iens[q_index]] = st
except KeyError as e:
logger.debug(f"differ could produce no snapshot due to {e}")
return None
return snapshot

def add_ensemble_evaluator_information_to_jobs_file(self, ee_id):
for q_index, q_node in enumerate(self.job_list):
with open(f"{q_node.run_path}/{JOBS_FILE}", "r+") as jobs_file:
data = json.load(jobs_file)
data["ee_id"] = ee_id
data["real_id"] = self._qindex_to_iens[q_index]
data["stage_id"] = 0
jobs_file.seek(0)
jobs_file.truncate()
json.dump(data, jobs_file, indent=4)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def get_data_files():
"psutil",
"pyyaml",
"requests",
"websockets",
],
entry_points={
"console_scripts": ["job_dispatch.py = job_runner.job_dispatch:main"]
Expand Down
11 changes: 8 additions & 3 deletions tests/res/enkf/test_enkf_simulation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,22 @@ def test_stop_long_running(self):

# We don't need the c layer call here, we only need it added to
# the queue's job_list.
with mock.patch("res.job_queue.JobQueue._add_job"):
for job in job_list:
queue.add_job(job)
with mock.patch("res.job_queue.JobQueue._add_job") as _add_job:
for idx, job in enumerate(job_list):
_add_job.return_value = idx
queue.add_job(job, idx)

queue.stop_long_running_jobs(5)
queue._transition()

for i in range(5):
assert job_list[i].status == JobStatusType.JOB_QUEUE_DONE
assert queue.snapshot()[i] == str(JobStatusType.JOB_QUEUE_DONE)

for i in range(5, 8):
assert job_list[i].status == JobStatusType.JOB_QUEUE_FAILED
assert queue.snapshot()[i] == str(JobStatusType.JOB_QUEUE_FAILED)

for i in range(8, 10):
assert job_list[i].status == JobStatusType.JOB_QUEUE_RUNNING
assert queue.snapshot()[i] == str(JobStatusType.JOB_QUEUE_RUNNING)
26 changes: 20 additions & 6 deletions tests/res/job_queue/test_job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def create_queue(script, max_submit=1, max_runtime=None):
max_runtime=max_runtime,
)

job_queue.add_job(job)
job_queue.add_job(job, i)

return job_queue

Expand Down Expand Up @@ -96,8 +96,14 @@ def test_kill_jobs(self):

wait_until(lambda: self.assertFalse(job_queue.is_active()))

for job in job_queue.job_list:
job_queue._transition()

for q_index, job in enumerate(job_queue.job_list):
assert job.status == JobStatusType.JOB_QUEUE_IS_KILLED
iens = job_queue._qindex_to_iens[q_index]
assert job_queue.snapshot()[iens] == str(
JobStatusType.JOB_QUEUE_IS_KILLED
)

for job in job_queue.job_list:
job.wait_for()
Expand Down Expand Up @@ -138,12 +144,14 @@ def test_failing_jobs(self):
for job in job_queue.job_list:
job.wait_for()

job_queue._transition()

assert job_queue.fetch_next_waiting() is None

for job in job_queue.job_list:
for q_index, job in enumerate(job_queue.job_list):
assert job.status == JobStatusType.JOB_QUEUE_FAILED

assert True
iens = job_queue._qindex_to_iens[q_index]
assert job_queue.snapshot()[iens] == str(JobStatusType.JOB_QUEUE_FAILED)

def test_timeout_jobs(self):
with TestAreaContext("job_queue_test_kill") as work_area:
Expand All @@ -160,8 +168,14 @@ def test_timeout_jobs(self):

wait_until(lambda: self.assertFalse(job_queue.is_active()))

for job in job_queue.job_list:
job_queue._transition()

for q_index, job in enumerate(job_queue.job_list):
assert job.status == JobStatusType.JOB_QUEUE_IS_KILLED
iens = job_queue._qindex_to_iens[q_index]
assert job_queue.snapshot()[iens] == str(
JobStatusType.JOB_QUEUE_IS_KILLED
)

for job in job_queue.job_list:
job.wait_for()
4 changes: 2 additions & 2 deletions tests/res/job_queue/test_job_queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def create_queue(script, max_submit=2):
os.path.realpath(dummy_config["run_path"].format(i)),
],
)
job_queue.add_job(job)

job_queue.add_job(job, i)
job_queue.submit_complete()
return job_queue


Expand Down

0 comments on commit 951c2f5

Please sign in to comment.