Skip to content

Commit

Permalink
Wait for nsf file synchronization
Browse files Browse the repository at this point in the history
Ensure that the files generated by a job match their expected checksums. It waits for the job's runpath to appear in the scheduler's checksum dictionary, verifies the checksums of the files, and logs any errors or discrepancies.
  • Loading branch information
DanSava committed Jun 12, 2024
1 parent c5b64a9 commit c1d2e21
Show file tree
Hide file tree
Showing 17 changed files with 528 additions and 34 deletions.
15 changes: 15 additions & 0 deletions src/_ert_forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from _ert_forward_model_runner.reporting.base import Reporter
from _ert_forward_model_runner.reporting.message import (
_JOB_EXIT_FAILED_STRING,
Checksum,
Exited,
Finish,
Init,
Expand All @@ -29,11 +30,13 @@
_FORWARD_MODEL_START = "com.equinor.ert.forward_model_job.start"
_FORWARD_MODEL_RUNNING = "com.equinor.ert.forward_model_job.running"
_FORWARD_MODEL_SUCCESS = "com.equinor.ert.forward_model_job.success"
_FORWARD_MODEL_CHECKSUM = "com.equinor.ert.forward_model_job.checksum"
_FORWARD_MODEL_FAILURE = "com.equinor.ert.forward_model_job.failure"

_CONTENT_TYPE = "datacontenttype"
_JOB_MSG_TYPE = "type"
_JOB_SOURCE = "source"
_RUN_PATH = "run_path"

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -66,6 +69,7 @@ def __init__(self, evaluator_url, token=None, cert_path=None):
self._statemachine = StateMachine()
self._statemachine.add_handler((Init,), self._init_handler)
self._statemachine.add_handler((Start, Running, Exited), self._job_handler)
self._statemachine.add_handler((Checksum,), self._checksum_handler)
self._statemachine.add_handler((Finish,), self._finished_handler)

self._ens_id = None
Expand Down Expand Up @@ -193,3 +197,14 @@ def _finished_handler(self, msg):
)
if self._event_publisher_thread.is_alive():
self._event_publisher_thread.join()

def _checksum_handler(self, msg):
job_msg_attrs = {
_JOB_SOURCE: (f"/ert/ensemble/{self._ens_id}/real/{self._real_id}"),
_CONTENT_TYPE: "application/json",
_RUN_PATH: msg.run_path,
}
self._dump_event(
attributes={_JOB_MSG_TYPE: _FORWARD_MODEL_CHECKSUM, **job_msg_attrs},
data=msg.data,
)
18 changes: 17 additions & 1 deletion src/_ert_forward_model_runner/reporting/message.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import dataclasses
from datetime import datetime as dt
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Dict, Literal, Optional, TypedDict

import psutil

if TYPE_CHECKING:
from _ert_forward_model_runner.job import Job

class _ChecksumDictBase(TypedDict):
type: Literal["file"]
path: str

class ChecksumDict(_ChecksumDictBase, total=False):
md5sum: str
error: str


_JOB_STATUS_SUCCESS = "Success"
_JOB_STATUS_RUNNING = "Running"
_JOB_STATUS_FAILURE = "Failure"
Expand Down Expand Up @@ -119,3 +128,10 @@ class Exited(Message):
def __init__(self, job, exit_code):
super().__init__(job)
self.exit_code = exit_code


class Checksum(Message):
def __init__(self, checksum_dict: Dict[str, "ChecksumDict"], run_path: str):
super().__init__()
self.data = checksum_dict
self.run_path = run_path
7 changes: 5 additions & 2 deletions src/_ert_forward_model_runner/reporting/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Callable, Dict, Tuple, Type

from _ert_forward_model_runner.reporting.message import (
Checksum,
Exited,
Finish,
Init,
Expand All @@ -22,12 +23,14 @@ def __init__(self) -> None:
logger.debug("Initializing state machines")
initialized = (Init,)
jobs = (Start, Running, Exited)
checksum = (Checksum,)
finished = (Finish,)
self._handler: Dict[Message, Callable[[Message], None]] = {}
self._transitions = {
None: initialized,
initialized: jobs + finished,
jobs: jobs + finished,
initialized: jobs + checksum + finished,
jobs: jobs + checksum + finished,
checksum: checksum + finished,
}
self._state = None

Expand Down
31 changes: 29 additions & 2 deletions src/_ert_forward_model_runner/runner.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import hashlib
import json
import os
from pathlib import Path

from _ert_forward_model_runner.job import Job
from _ert_forward_model_runner.reporting.message import Finish, Init
from _ert_forward_model_runner.reporting.message import Checksum, Finish, Init


class ForwardModelRunner:
def __init__(self, jobs_data):
self.jobs_data = jobs_data
self.simulation_id = jobs_data.get("run_id")
self.experiment_id = jobs_data.get("experiment_id")
self.ens_id = jobs_data.get("ens_id")
Expand All @@ -23,14 +27,34 @@ def __init__(self, jobs_data):

self._set_environment()

def _read_manifest(self):
if not Path("manifest.json").exists():
return None
with open("manifest.json", mode="r", encoding="utf-8") as f:
data = json.load(f)
return {
name: {"type": "file", "path": str(Path(file).absolute())}
for name, file in data.items()
}

def _populate_checksums(self, manifest):
if not manifest:
return None
for info in manifest.values():
path = Path(info["path"])
if path.exists():
info["md5sum"] = hashlib.md5(path.read_bytes()).hexdigest()
else:
info["error"] = f"Expected file {path} not created by forward model!"
return manifest

def run(self, names_of_jobs_to_run):
# if names_of_jobs_to_run, create job_queue which contains jobs that
# are to be run.
if not names_of_jobs_to_run:
job_queue = self.jobs
else:
job_queue = [j for j in self.jobs if j.name() in names_of_jobs_to_run]

init_message = Init(
job_queue,
self.simulation_id,
Expand All @@ -56,9 +80,12 @@ def run(self, names_of_jobs_to_run):
yield status_update

if not status_update.success():
yield Checksum(checksum_dict=None, run_path=os.getcwd())
yield Finish().with_error("Not all jobs completed successfully.")
return

checksum_dict = self._populate_checksums(self._read_manifest())
yield Checksum(checksum_dict=checksum_dict, run_path=os.getcwd())
yield Finish()

def _set_environment(self):
Expand Down
27 changes: 27 additions & 0 deletions src/ert/config/ert_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,33 @@ def _create_list_of_forward_model_steps_to_run(
def forward_model_step_name_list(self) -> List[str]:
return [j.name for j in self.forward_model_steps]

def manifest_to_json(self, iens: int = 0, iter: int = 0) -> Dict[str, Any]:
manifest = {}
# Add expected parameter files to manifest
if iter == 0:
for (
name,
parameter_config,
) in self.ensemble_config.parameter_configs.items():
if parameter_config.forward_init and parameter_config.forward_init_file:
file_path = parameter_config.forward_init_file.replace(
"%d", str(iens)
)
manifest[name] = file_path
# Add expected response files to manifest
for name, respons_config in self.ensemble_config.response_configs.items():
input_file = str(respons_config.input_file)
if isinstance(respons_config, SummaryConfig):
input_file = input_file.replace("<IENS>", str(iens))
manifest[f"{name}_UNSMRY"] = f"{input_file}.UNSMRY"
manifest[f"{name}_SMSPEC"] = f"{input_file}.SMSPEC"
if isinstance(respons_config, GenDataConfig):
if respons_config.report_steps and iens in respons_config.report_steps:
manifest[name] = input_file.replace("%d", str(iens))
elif "%d" not in input_file:
manifest[name] = input_file
return manifest

def forward_model_data_to_json(
self,
run_id: Optional[str] = None,
Expand Down
4 changes: 4 additions & 0 deletions src/ert/enkf_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ def create_run_path(
)

json.dump(forward_model_output, fptr)
# Write MANIFEST file to runpath use to avoid NFS sync issues
with open(run_path / "manifest.json", mode="w", encoding="utf-8") as fptr:
data = ert_config.manifest_to_json(run_arg.iens, run_arg.itr)
json.dump(data, fptr)

run_context.runpaths.write_runpath_list(
[run_context.iteration], run_context.active_realizations
Expand Down
45 changes: 32 additions & 13 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
EVTYPE_ENSEMBLE_FAILED,
EVTYPE_ENSEMBLE_STARTED,
EVTYPE_ENSEMBLE_STOPPED,
EVTYPE_FORWARD_MODEL_CHECKSUM,
)
from .snapshot import PartialSnapshot
from .state import (
Expand Down Expand Up @@ -100,6 +101,18 @@ def config(self) -> EvaluatorServerConfig:
def ensemble(self) -> Ensemble:
return self._ensemble

async def forward_checksum(self, event: CloudEvent) -> None:
forward_event = CloudEvent(
{
"type": EVTYPE_FORWARD_MODEL_CHECKSUM,
"source": f"/ert/ensemble/{self.ensemble.id_}",
},
{event["run_path"]: event.data},
)
await self._send_message(
to_json(forward_event, data_marshaller=evaluator_marshaller).decode()
)

def _fm_handler(self, events: List[CloudEvent]) -> None:
with self._snapshot_mutex:
snapshot_update_event = self.ensemble.update_snapshot(events)
Expand Down Expand Up @@ -171,18 +184,7 @@ async def _send_snapshot_update(
EVTYPE_EE_SNAPSHOT_UPDATE,
snapshot_update_event.to_dict(),
)
if message and self._clients:
# Note return_exceptions=True in gather. This fire-and-forget
# approach is currently how we deal with failures when trying
# to send udates to clients. Rationale is that if sending to
# the client fails, the websocket is down and we have no way
# to re-establish it. Thus, it becomes the responsibility of
# the client to re-connect if necessary, in which case the first
# update it receives will be a full snapshot.
await asyncio.gather(
*[client.send(message) for client in self._clients],
return_exceptions=True,
)
await self._send_message(message)

def _create_cloud_event(
self,
Expand Down Expand Up @@ -282,7 +284,10 @@ async def handle_dispatch(
)
continue
try:
await self._dispatcher.handle_event(event)
if event["type"] == EVTYPE_FORWARD_MODEL_CHECKSUM:
await self.forward_checksum(event)
else:
await self._dispatcher.handle_event(event)
except BaseException as ex:
# Exceptions include asyncio.InvalidStateError, and
# anything that self._*_handler() can raise (updates
Expand Down Expand Up @@ -430,3 +435,17 @@ def get_successful_realizations(self) -> List[int]:
def _get_ens_id(source: str) -> str:
# the ens_id will be found at /ert/ensemble/ens_id/...
return source.split("/")[3]

async def _send_message(self, message: Optional[str] = None) -> None:
if message and self._clients:
# Note return_exceptions=True in gather. This fire-and-forget
# approach is currently how we deal with failures when trying
# to send udates to clients. Rationale is that if sending to
# the client fails, the websocket is down and we have no way
# to re-establish it. Thus, it becomes the responsibility of
# the client to re-connect if necessary, in which case the first
# update it receives will be a full snapshot.
await asyncio.gather(
*[client.send(message) for client in self._clients],
return_exceptions=True,
)
1 change: 1 addition & 0 deletions src/ert/ensemble_evaluator/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
EVTYPE_FORWARD_MODEL_RUNNING = "com.equinor.ert.forward_model_job.running"
EVTYPE_FORWARD_MODEL_SUCCESS = "com.equinor.ert.forward_model_job.success"
EVTYPE_FORWARD_MODEL_FAILURE = "com.equinor.ert.forward_model_job.failure"
EVTYPE_FORWARD_MODEL_CHECKSUM = "com.equinor.ert.forward_model_job.checksum"


EVGROUP_REALIZATION = {
Expand Down
2 changes: 2 additions & 0 deletions src/ert/event_type_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
EVTYPE_ENSEMBLE_STOPPED = "com.equinor.ert.ensemble.stopped"
EVTYPE_ENSEMBLE_CANCELLED = "com.equinor.ert.ensemble.cancelled"
EVTYPE_ENSEMBLE_FAILED = "com.equinor.ert.ensemble.failed"

EVTYPE_FORWARD_MODEL_CHECKSUM = "com.equinor.ert.forward_model_job.checksum"
47 changes: 47 additions & 0 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import hashlib
import logging
import time
import uuid
Expand Down Expand Up @@ -136,6 +137,8 @@ async def run(self, sem: asyncio.BoundedSemaphore, max_submit: int = 2) -> None:
break

if self.returncode.result() == 0:
if self._scheduler.wait_for_checksum():
await self._verify_checksum()
await self._handle_finished_forward_model()
break

Expand Down Expand Up @@ -167,6 +170,50 @@ async def _max_runtime_task(self) -> None:
)
self.returncode.cancel()

async def _verify_checksum(self, timeout: int = 120) -> None:
# Wait for job runpath to be in the checksum dictionary
runpath = self.real.run_arg.runpath
while runpath not in self._scheduler.checksum:
if timeout <= 0:
break
timeout -= 1
await asyncio.sleep(1)

checksum = self._scheduler.checksum.get(runpath)
if checksum is None:
logger.warning(f"Checksum information not received for {runpath}")
return

errors = "\n".join(
[info["error"] for info in checksum.values() if "error" in info]
)
if errors:
logger.error(errors)

valid_checksums = [info for info in checksum.values() if "error" not in info]

# Wait for files in checksum
while not all(Path(info["path"]).exists() for info in valid_checksums):
if timeout <= 0:
break
timeout -= 1
logger.debug("Waiting for disk synchronization")
await asyncio.sleep(1)

for info in valid_checksums:
file_path = Path(info["path"])
expected_md5sum = info.get("md5sum")
if file_path.exists() and expected_md5sum:
actual_md5sum = hashlib.md5(file_path.read_bytes()).hexdigest()
if expected_md5sum == actual_md5sum:
logger.debug(f"File {file_path} checksum successful.")
else:
logger.warning(f"File {file_path} checksum verification failed.")
elif file_path.exists() and expected_md5sum is None:
logger.warning(f"Checksum not received for file {file_path}")
else:
logger.error(f"Disk synchronization failed for {file_path}")

async def _handle_finished_forward_model(self) -> None:
callback_status, status_msg = forward_model_ok(self.real.run_arg)
if self._callback_status_msg:
Expand Down
Loading

0 comments on commit c1d2e21

Please sign in to comment.