Skip to content

Commit

Permalink
Report stderr from realization the same way as c-code did
Browse files Browse the repository at this point in the history
This reimplements job_queue_node_fscanf_EXIT in job_node.cpp

It has a small breakage in the contents of the ERROR file
left in runpath in that it is now valid XML as opposed to
invalid XML produced by the file.py reporter before.
  • Loading branch information
berland committed Nov 24, 2023
1 parent 12e8ca1 commit 82ccc29
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 11 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies=[
"typing_extensions",
"jinja2",
"lark",
"lxml",
"matplotlib",
"numpy<2",
"PyQt5",
Expand Down
17 changes: 12 additions & 5 deletions src/_ert_job_runner/reporting/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ def _add_log_line(self, job):
time_str = time.strftime(TIME_FORMAT, time.localtime())
f.write(f"{time_str} Calling: {job.job_data['executable']} {args}\n")

# This file will be read by the job_queue_node_fscanf_EXIT() function
# in job_queue.c. Be very careful with changes in output format.
def _dump_error_file(self, job, error_msg):
with append(ERROR_file) as file:
file.write("<error>\n")
Expand All @@ -176,11 +174,20 @@ def _dump_error_file(self, job, error_msg):
if stderr:
stderr_file = os.path.join(os.getcwd(), job.std_err)
else:
stderr = f"<Not written by:{job.name()}>\n"
stderr = f"Not written by:{job.name()}\n"
else:
stderr = f"<stderr: Could not find file:{job.std_err}>\n"
stderr = f"stderr: Could not find file:{job.std_err}\n"
else:
stderr = "<stderr: Not redirected>\n"
stderr = "stderr: Not redirected\n"

# Escape XML characters
stderr = (
stderr.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&apos;")
)

file.write(f" <stderr>\n{stderr}</stderr>\n")
if stderr_file:
Expand Down
13 changes: 10 additions & 3 deletions src/ert/job_queue/driver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
import os
import shlex
import shutil
Expand All @@ -12,6 +13,9 @@
from ert.job_queue import RealizationState


logger = logging.getLogger(__name__)


class Driver(ABC):
def __init__(
self,
Expand Down Expand Up @@ -86,14 +90,17 @@ async def submit(self, realization: "RealizationState") -> None:

# Wait for process to finish:
output, error = await process.communicate()
print(output)
print(error)
if process.returncode == 0:
if output:
logger.info(output)
realization.runend()
else:
if output:
logger.error(output)
if error:
logger.error(error)
if str(realization.current_state.id) == "RUNNING": # (circular import..)
realization.runfail()
# TODO: fetch stdout/stderr

async def poll_statuses(self) -> None:
pass
Expand Down
21 changes: 21 additions & 0 deletions src/ert/job_queue/realization_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import logging
import pathlib
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Optional

from lxml import etree
from statemachine import State, StateMachine

from ert.constant_filenames import ERROR_file, STATUS_file
Expand Down Expand Up @@ -61,6 +63,7 @@ def __init__(
self.iens: int = realization.run_arg.iens
self.start_time: Optional[datetime.datetime] = None
self.retries_left: int = retries
self._max_submit = retries + 1
self._callback_status_msg: Optional[str] = None
super().__init__()

Expand Down Expand Up @@ -128,6 +131,24 @@ def on_enter_EXIT(self) -> None:
self.retry()
self.retries_left -= 1
else:
logger.error(
f"Realization: {self.realization.run_arg.iens} "
f"failed after reaching max submit ({self._max_submit}):"
)
exit_file_path = (
Path(self.realization.run_arg.runpath) / self.realization.exit_file
)
if exit_file_path.exists():
exit_file = etree.parse(exit_file_path)
failed_job = exit_file.find("job").text
error_reason = exit_file.find("reason").text
stderr_capture = exit_file.find("stderr").text
stderr_file = exit_file.find("stderr_file").text
logger.error(
f"job {failed_job} failed with: '{error_reason}'\n"
f"\tstderr file: '{stderr_file}',\n"
f"\tits contents:{stderr_capture}"
)
self.invalidate()

def on_enter_DONE(self) -> None:
Expand Down
5 changes: 2 additions & 3 deletions test-data/poly_example/poly.ert
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
JOBNAME poly_%d

QUEUE_SYSTEM LOCAL
QUEUE_OPTION LOCAL MAX_RUNNING 5
--QUEUE_SYSTEM LSF
QUEUE_OPTION LOCAL MAX_RUNNING 50

RUNPATH poly_out/realization-<IENS>/iter-<ITER>

OBS_CONFIG observations

NUM_REALIZATIONS 20
NUM_REALIZATIONS 100
MIN_REALIZATIONS 1

GEN_KW COEFFS coeff_priors
Expand Down

0 comments on commit 82ccc29

Please sign in to comment.