Skip to content
This repository has been archived by the owner on Jul 19, 2021. It is now read-only.

Commit

Permalink
Support first iteration of ensemble evaluator
Browse files Browse the repository at this point in the history
- Add an Event reporter to job_runner so that it produces events
- Slightly refactor the job_runner cli to accommodate the need for
  reporters to vary
  • Loading branch information
jondequinor committed Nov 6, 2020
1 parent d05f2c4 commit a1487a7
Show file tree
Hide file tree
Showing 14 changed files with 516 additions and 119 deletions.
33 changes: 26 additions & 7 deletions python/job_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,26 @@
import os
import signal
import sys
import json

import job_runner.reporting as reporting
from job_runner.reporting.message import Finish
from job_runner.runner import JobRunner
from job_runner import JOBS_FILE


def _setup_reporters(is_interactive_run, ee_id):
reporters = []
if is_interactive_run:
reporters.append(reporting.Interactive())
elif ee_id:
reporters.append(reporting.File())
reporters.append(reporting.Network())
reporters.append(reporting.Event())
else:
reporters.append(reporting.File())
reporters.append(reporting.Network())
return reporters


def main(args):
Expand All @@ -28,15 +44,18 @@ def main(args):
sys.exit("No such directory: {}".format(parsed_args.run_path))
os.chdir(parsed_args.run_path)

reporters = []
ee_id = None
try:
with open(JOBS_FILE, "r") as json_file:
jobs_data = json.load(json_file)
ee_id = jobs_data.get("ee_id")
except ValueError as e:
raise IOError("Job Runner cli failed to load JSON-file.{}".format(str(e)))

if len(parsed_args.job) > 0:
reporters.append(reporting.Interactive())
else:
reporters.append(reporting.File())
reporters.append(reporting.Network())
is_interactive_run = len(parsed_args.job) > 0
reporters = _setup_reporters(is_interactive_run, ee_id)

job_runner = JobRunner()
job_runner = JobRunner(jobs_data)

for job_status in job_runner.run(parsed_args.job):
for reporter in reporters:
Expand Down
1 change: 1 addition & 0 deletions python/job_runner/reporting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
from .file import File
from .interactive import Interactive
from .network import Network
from .event import Event
186 changes: 186 additions & 0 deletions python/job_runner/reporting/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
from cloudevents.http import CloudEvent, to_json
from job_runner.reporting.message import (
Exited,
Finish,
Init,
Running,
Start,
)

_FM_JOB_START = "com.equinor.ert.forward_model_job.start"
_FM_JOB_RUNNING = "com.equinor.ert.forward_model_job.running"
_FM_JOB_SUCCESS = "com.equinor.ert.forward_model_job.success"
_FM_JOB_FAILURE = "com.equinor.ert.forward_model_job.failure"

_FM_STEP_START = "com.equinor.ert.forward_model_step.start"
_FM_STEP_FAILURE = "com.equinor.ert.forward_model_step.failure"
_FM_STEP_SUCCESS = "com.equinor.ert.forward_model_step.success"


class TransitionError(ValueError):
pass


class Event:
def __init__(self, event_log="event_log"):
self._event_log = event_log

self._ee_id = None
self._real_id = None
self._stage_id = None

self._initialize_state_machine()
self._clear_log()

def _initialize_state_machine(self):
initialized = (Init,)
jobs = (Start, Running, Exited)
finished = (Finish,)
self._states = {
initialized: self._init_handler,
jobs: self._job_handler,
finished: self._end_handler,
}
self._transitions = {
None: initialized,
initialized: jobs + finished,
jobs: jobs + finished,
}
self._state = None

def _clear_log(self):
with open(self._event_log, "w") as f:
pass

def report(self, msg):
new_state = None
for state in self._states.keys():
if isinstance(msg, state):
new_state = state

if self._state not in self._transitions or not isinstance(
msg, self._transitions[self._state]
):
raise TransitionError(
f"Illegal transition {self._state} -> {new_state} for {msg}, expected to transition into {self._transitions[self._state]}"
)

self._states[new_state](msg)
self._state = new_state

def _dump_event(self, event):
with open(self._event_log, "a") as el:
el.write("{}\n".format(to_json(event).decode()))

def _step_path(self):
return f"/ert/ee/{self._ee_id}/real/{self._real_id}/stage/{self._stage_id}/step/{0}"

def _init_handler(self, msg):
self._ee_id = msg.ee_id
self._real_id = msg.real_id
self._stage_id = msg.stage_id
self._dump_event(
CloudEvent(
{
"type": _FM_STEP_START,
"source": self._step_path(),
"datacontenttype": "application/json",
},
{
"jobs": [job.job_data for job in msg.jobs],
},
)
)

def _job_handler(self, msg):
job_path = f"{self._step_path()}/job/{msg.job.index}"

if isinstance(msg, Start):
self._dump_event(
CloudEvent(
{
"type": _FM_JOB_START,
"source": job_path,
},
None,
)
)
if not msg.success():
self._dump_event(
CloudEvent(
{
"type": _FM_JOB_FAILURE,
"source": job_path,
"datacontenttype": "application/json",
},
{
"error_msg": msg.error_message,
},
)
)

elif isinstance(msg, Exited):
if msg.success():
self._dump_event(
CloudEvent(
{
"type": _FM_JOB_SUCCESS,
"source": job_path,
},
None,
)
)
else:
self._dump_event(
CloudEvent(
{
"type": _FM_JOB_FAILURE,
"source": job_path,
"datacontenttype": "application/json",
},
{
"exit_code": msg.exit_code,
"error_msg": msg.error_message,
},
)
)

elif isinstance(msg, Running):
self._dump_event(
CloudEvent(
{
"type": _FM_JOB_RUNNING,
"source": job_path,
"datacontenttype": "application/json",
},
{
"max_memory_usage": msg.max_memory_usage,
"current_memory_usage": msg.current_memory_usage,
},
)
)

def _end_handler(self, msg):
step_path = self._step_path()
if msg.success():
self._dump_event(
CloudEvent(
{
"type": _FM_STEP_SUCCESS,
"source": step_path,
}
)
)
else:
self._dump_event(
CloudEvent(
{
"type": _FM_STEP_FAILURE,
"source": step_path,
"datacontenttype": "application/json",
},
{
"error_msg": msg.error_message,
},
)
)
55 changes: 24 additions & 31 deletions python/job_runner/reporting/file.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import json
import os
import shutil
import socket
import time

from job_runner.io import cond_unlink
from job_runner.reporting.message import Exited, Finish, Init, Running, Start
from job_runner.reporting.message import (
_JOB_STATUS_FAILURE,
_JOB_STATUS_RUNNING,
_JOB_STATUS_SUCCESS,
Exited,
Finish,
Init,
Running,
Start,
)
from job_runner.util import data as data_util


class File(object):
Expand Down Expand Up @@ -36,25 +45,25 @@ def report(self, msg, sync_disc_timeout=10):
if msg.success():
self._start_status_file(msg)
self._add_log_line(msg.job)
job_status["status"] = "Running"
job_status["start_time"] = self._datetime_serialize(msg.timestamp)
job_status["status"] = _JOB_STATUS_RUNNING
job_status["start_time"] = data_util.datetime_serialize(msg.timestamp)
else:
error_msg = msg.error_message
job_status["status"] = "Failure"
job_status["status"] = _JOB_STATUS_FAILURE
job_status["error"] = error_msg
job_status["end_time"] = self._datetime_serialize(msg.timestamp)
job_status["end_time"] = data_util.datetime_serialize(msg.timestamp)

self._complete_status_file(msg)
elif isinstance(msg, Exited):
job_status["end_time"] = self._datetime_serialize(msg.timestamp)
job_status["end_time"] = data_util.datetime_serialize(msg.timestamp)

if msg.success():
job_status["status"] = "Success"
job_status["status"] = _JOB_STATUS_SUCCESS
self._complete_status_file(msg)
else:
error_msg = msg.error_message
job_status["error"] = error_msg
job_status["status"] = "Failure"
job_status["status"] = _JOB_STATUS_FAILURE

# A STATUS_file is not written if there is no exit_code, i.e.
# when the job is killed due to timeout.
Expand All @@ -65,11 +74,13 @@ def report(self, msg, sync_disc_timeout=10):
elif isinstance(msg, Running):
job_status["max_memory_usage"] = msg.max_memory_usage
job_status["current_memory_usage"] = msg.current_memory_usage
job_status["status"] = "Running"
job_status["status"] = _JOB_STATUS_RUNNING

elif isinstance(msg, Finish):
if msg.success():
self.status_dict["end_time"] = self._datetime_serialize(msg.timestamp)
self.status_dict["end_time"] = data_util.datetime_serialize(
msg.timestamp
)
self._dump_ok_file(sync_disc_timeout)
else:
# this has already been handled by earlier event
Expand All @@ -89,27 +100,9 @@ def _init_status_file(self):
def _init_job_status_dict(self, start_time, run_id, jobs):
return {
"run_id": run_id,
"start_time": self._datetime_serialize(start_time),
"end_time": None,
"jobs": [self._create_job_dict(j) for j in jobs],
}

def _datetime_serialize(self, dt):
if dt is None:
return None
return time.mktime(dt.timetuple())

def _create_job_dict(self, job):
return {
"name": job.name(),
"status": "Waiting",
"error": None,
"start_time": None,
"start_time": data_util.datetime_serialize(start_time),
"end_time": None,
"stdout": job.std_out,
"stderr": job.std_err,
"current_memory_usage": None,
"max_memory_usage": None,
"jobs": [data_util.create_job_dict(j) for j in jobs],
}

def _start_status_file(self, msg):
Expand Down
24 changes: 22 additions & 2 deletions python/job_runner/reporting/message.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
from datetime import datetime as dt

_JOB_STATUS_SUCCESS = "Success"
_JOB_STATUS_RUNNING = "Running"
_JOB_STATUS_FAILURE = "Failure"
_JOB_STATUS_WAITING = "Waiting"

class Message(object):
_RUNNER_STATUS_INITIALIZED = "Initialized"
_RUNNER_STATUS_SUCCESS = "Success"
_RUNNER_STATUS_FAILURE = "Failure"


class _MetaMessage(type):
def __repr__(cls):
return f"MessageType<{cls.__name__}>"


class Message(metaclass=_MetaMessage):
def __init__(self, job=None):
self.timestamp = dt.now()
self.job = job
self.error_message = None

def __repr__(self):
return type(self).__name__

def with_error(self, message):
self.error_message = message
return self
Expand All @@ -19,11 +36,14 @@ def success(self):


class Init(Message):
def __init__(self, jobs, run_id, ert_pid):
def __init__(self, jobs, run_id, ert_pid, ee_id=None, real_id=None, stage_id=None):
super(Init, self).__init__()
self.jobs = jobs
self.run_id = run_id
self.ert_pid = ert_pid
self.ee_id = ee_id
self.real_id = real_id
self.stage_id = stage_id


class Finish(Message):
Expand Down
Loading

0 comments on commit a1487a7

Please sign in to comment.