Skip to content

Commit

Permalink
Remove step states from snapshots.
Browse files Browse the repository at this point in the history
The step state is moved upwards to the realization.
  • Loading branch information
berland committed Nov 1, 2023
1 parent 129f8ea commit e7ec249
Show file tree
Hide file tree
Showing 17 changed files with 159 additions and 260 deletions.
2 changes: 0 additions & 2 deletions src/_ert_job_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ def __init__(self, jobs_data):
self.experiment_id = jobs_data.get("experiment_id")
self.ens_id = jobs_data.get("ens_id")
self.real_id = jobs_data.get("real_id")
self.step_id = jobs_data.get("step_id")
self.ert_pid = jobs_data.get("ert_pid")
self.global_environment = jobs_data.get("global_environment")
job_data_list = jobs_data["jobList"]
Expand Down Expand Up @@ -38,7 +37,6 @@ def run(self, names_of_jobs_to_run):
self.ert_pid,
self.ens_id,
self.real_id,
self.step_id,
self.experiment_id,
)

Expand Down
2 changes: 0 additions & 2 deletions src/ert/ensemble_evaluator/_builder/_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
RealizationSnapshot,
Snapshot,
SnapshotDict,
Step,
)
from ert.serialization import evaluator_marshaller

Expand Down Expand Up @@ -161,7 +160,6 @@ def _create_snapshot(self) -> Snapshot:
active=True,
status=state.REALIZATION_STATE_WAITING,
)
reals[str(real.iens)].step = Step(status=state.STEP_STATE_UNKNOWN)
for job in real.jobs:
reals[str(real.iens)].jobs[str(job.id_)] = Job(
status=state.JOB_STATE_START,
Expand Down
2 changes: 1 addition & 1 deletion src/ert/ensemble_evaluator/_builder/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def setup_timeout_callback(
) -> Tuple[Callable[[int], None], asyncio.Task[None]]:
def on_timeout(iens: int) -> None:
timeout_queue.put_nowait(
event_generator(identifiers.EVTYPE_FM_STEP_TIMEOUT, iens)
event_generator(identifiers.EVTYPE_REALIZATION_TIMEOUT, iens)
)

async def send_timeout_message() -> None:
Expand Down
32 changes: 16 additions & 16 deletions src/ert/ensemble_evaluator/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@
STDOUT = "stdout"
STEPS = "steps"

EVTYPE_FM_STEP_FAILURE = "com.equinor.ert.forward_model_step.failure"
EVTYPE_FM_STEP_PENDING = "com.equinor.ert.forward_model_step.pending"
EVTYPE_FM_STEP_RUNNING = "com.equinor.ert.forward_model_step.running"
EVTYPE_FM_STEP_SUCCESS = "com.equinor.ert.forward_model_step.success"
EVTYPE_FM_STEP_UNKNOWN = "com.equinor.ert.forward_model_step.unknown"
EVTYPE_FM_STEP_WAITING = "com.equinor.ert.forward_model_step.waiting"
EVTYPE_FM_STEP_TIMEOUT = "com.equinor.ert.forward_model_step.timeout"
EVTYPE_REALIZATION_FAILURE = "com.equinor.ert.realization.failure"
EVTYPE_REALIZATION_PENDING = "com.equinor.ert.realization.pending"
EVTYPE_REALIZATION_RUNNING = "com.equinor.ert.realization.running"
EVTYPE_REALIZATION_SUCCESS = "com.equinor.ert.realization.success"
EVTYPE_REALIZATION_UNKNOWN = "com.equinor.ert.realization.unknown"
EVTYPE_REALIZATION_WAITING = "com.equinor.ert.realization.waiting"
EVTYPE_REALIZATION_TIMEOUT = "com.equinor.ert.realization.timeout"

EVTYPE_FM_JOB_START = "com.equinor.ert.forward_model_job.start"
EVTYPE_FM_JOB_RUNNING = "com.equinor.ert.forward_model_job.running"
EVTYPE_FM_JOB_SUCCESS = "com.equinor.ert.forward_model_job.success"
EVTYPE_FM_JOB_FAILURE = "com.equinor.ert.forward_model_job.failure"


EVGROUP_FM_STEP = {
EVTYPE_FM_STEP_FAILURE,
EVTYPE_FM_STEP_PENDING,
EVTYPE_FM_STEP_RUNNING,
EVTYPE_FM_STEP_SUCCESS,
EVTYPE_FM_STEP_UNKNOWN,
EVTYPE_FM_STEP_WAITING,
EVTYPE_FM_STEP_TIMEOUT,
EVGROUP_REALIZATION = {
EVTYPE_REALIZATION_FAILURE,
EVTYPE_REALIZATION_PENDING,
EVTYPE_REALIZATION_RUNNING,
EVTYPE_REALIZATION_SUCCESS,
EVTYPE_REALIZATION_UNKNOWN,
EVTYPE_REALIZATION_WAITING,
EVTYPE_REALIZATION_TIMEOUT,
}

EVGROUP_FM_JOB = {
Expand All @@ -48,7 +48,7 @@
EVTYPE_FM_JOB_FAILURE,
}

EVGROUP_FM_ALL = EVGROUP_FM_STEP | EVGROUP_FM_JOB
EVGROUP_FM_ALL = EVGROUP_REALIZATION | EVGROUP_FM_JOB

EVTYPE_EE_SNAPSHOT = "com.equinor.ert.ee.snapshot"
EVTYPE_EE_SNAPSHOT_UPDATE = "com.equinor.ert.ee.snapshot_update"
Expand Down
157 changes: 29 additions & 128 deletions src/ert/ensemble_evaluator/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ class UnsupportedOperationException(ValueError):


_FM_TYPE_EVENT_TO_STATUS = {
ids.EVTYPE_FM_STEP_WAITING: state.STEP_STATE_WAITING,
ids.EVTYPE_FM_STEP_PENDING: state.STEP_STATE_PENDING,
ids.EVTYPE_FM_STEP_RUNNING: state.STEP_STATE_RUNNING,
ids.EVTYPE_FM_STEP_FAILURE: state.STEP_STATE_FAILURE,
ids.EVTYPE_FM_STEP_SUCCESS: state.STEP_STATE_SUCCESS,
ids.EVTYPE_FM_STEP_UNKNOWN: state.STEP_STATE_UNKNOWN,
ids.EVTYPE_FM_STEP_TIMEOUT: state.STEP_STATE_FAILURE,
ids.EVTYPE_REALIZATION_WAITING: state.REALIZATION_STATE_WAITING,
ids.EVTYPE_REALIZATION_PENDING: state.REALIZATION_STATE_PENDING,
ids.EVTYPE_REALIZATION_RUNNING: state.REALIZATION_STATE_RUNNING,
ids.EVTYPE_REALIZATION_FAILURE: state.REALIZATION_STATE_FAILED,
ids.EVTYPE_REALIZATION_SUCCESS: state.REALIZATION_STATE_FINISHED,
ids.EVTYPE_REALIZATION_UNKNOWN: state.REALIZATION_STATE_UNKNOWN,
ids.EVTYPE_REALIZATION_TIMEOUT: state.REALIZATION_STATE_FAILED,
ids.EVTYPE_FM_JOB_START: state.JOB_STATE_START,
ids.EVTYPE_FM_JOB_RUNNING: state.JOB_STATE_RUNNING,
ids.EVTYPE_FM_JOB_SUCCESS: state.JOB_STATE_FINISHED,
Expand All @@ -57,14 +57,6 @@ class UnsupportedOperationException(ValueError):
ids.EVTYPE_ENSEMBLE_FAILED: state.ENSEMBLE_STATE_FAILED,
}

_STEP_STATE_TO_REALIZATION_STATE = {
state.STEP_STATE_WAITING: state.REALIZATION_STATE_WAITING,
state.STEP_STATE_PENDING: state.REALIZATION_STATE_PENDING,
state.STEP_STATE_RUNNING: state.REALIZATION_STATE_RUNNING,
state.STEP_STATE_UNKNOWN: state.REALIZATION_STATE_UNKNOWN,
state.STEP_STATE_FAILURE: state.REALIZATION_STATE_FAILED,
}


def convert_iso8601_to_datetime(
timestamp: Union[datetime.datetime, str]
Expand All @@ -88,13 +80,6 @@ def __init__(self, snapshot: Optional["Snapshot"] = None) -> None:
realization number, pointing to a dict with keys active (bool),
start_time (datetime), end_time (datetime) and status (str)."""

self._step_states: Dict[
str, Dict[str, Union[str, datetime.datetime]]
] = defaultdict(dict)
"""A shallow dictionary of step states. The key is a string
realization id, pointing to a dict with the same members as the Step
class."""

self._job_states: Dict[
Tuple[str, str], Dict[str, Union[str, datetime.datetime]]
] = defaultdict(dict)
Expand All @@ -118,18 +103,18 @@ def update_metadata(self, metadata: Dict[str, Any]) -> None:
snapshot's metadata"""
self._metadata.update(_filter_nones(metadata))

def update_step(self, real_id: str, step: "Step") -> "PartialSnapshot":
step_update = _filter_nones(
{
"status": step.status,
"start_time": step.start_time,
"end_time": step.end_time,
}
def update_realization(
self,
real_id: str,
status: str,
start_time: Optional[datetime.datetime] = None,
end_time: Optional[datetime.datetime] = None,
) -> "PartialSnapshot":
self._realization_states[real_id].update(
_filter_nones(
{"status": status, "start_time": start_time, "end_time": end_time}
)
)
self._step_states[real_id].update(step_update)
if self._snapshot:
self._snapshot._my_partial._step_states[real_id].update(step_update)
self._propagate_step_update_to_realization(real_id)
return self

def update_job(
Expand All @@ -145,35 +130,6 @@ def update_job(
self._snapshot._my_partial._job_states[(real_id, job_id)].update(job_update)
return self

def _propagate_step_update_to_realization(self, real_id: str) -> "PartialSnapshot":
step = self._step_states[real_id]
step_status = step.get("status")
assert isinstance(step_status, str)
assert self._snapshot is not None

real_state = self._realization_states[real_id]
if real_state.get("status") == state.REALIZATION_STATE_FAILED:
return self
if (
step_status in _STEP_STATE_TO_REALIZATION_STATE
): # All but the finished state
self._realization_states[real_id].update(
{"status": _STEP_STATE_TO_REALIZATION_STATE[step_status]}
)
elif (
step_status == state.REALIZATION_STATE_FINISHED
and self._snapshot.step_finished(real_id)
):
real_state["status"] = state.REALIZATION_STATE_FINISHED
elif (
step_status == state.STEP_STATE_SUCCESS
and not self._snapshot.step_finished(real_id)
):
pass
else:
raise ValueError(f"unknown step status {step_status} for real: {real_id}")
return self

def get_all_jobs(
self,
) -> Mapping[Tuple[str, str], "Job"]:
Expand Down Expand Up @@ -226,20 +182,12 @@ def to_dict(self) -> Dict[str, Any]:
if self._realization_states:
_dict["reals"] = self._realization_states

for real_id, step_state in self._step_states.items():
if "reals" not in _dict:
_dict["reals"] = {real_id: {}}
if "step" not in _dict["reals"][real_id]:
_dict["reals"][real_id]["step"] = step_state

for job_tuple, job_values_dict in self._job_states.items():
real_id = job_tuple[0]
if "reals" not in _dict:
_dict["reals"] = {}
if real_id not in _dict["reals"]:
_dict["reals"][real_id] = {}
if "step" not in _dict["reals"][real_id]:
_dict["reals"][real_id]["step"] = {}
if "jobs" not in _dict["reals"][real_id]:
_dict["reals"][real_id]["jobs"] = {}

Expand All @@ -257,48 +205,38 @@ def _merge(self, other: "PartialSnapshot") -> "PartialSnapshot":
self._ensemble_state = other._ensemble_state
for real_id, other_real_data in other._realization_states.items():
self._realization_states[real_id].update(other_real_data)
for real_id, other_step_data in other._step_states.items():
self._step_states[real_id].update(other_step_data)
for job_id, other_job_data in other._job_states.items():
self._job_states[job_id].update(other_job_data)
return self

def from_cloudevent(self, event: CloudEvent) -> "PartialSnapshot":
e_type = event["type"]
e_source = event["source"]
status = _FM_TYPE_EVENT_TO_STATUS.get(e_type)
timestamp = event["time"]

if self._snapshot is None:
raise UnsupportedOperationException(
f"updating {self.__class__} without a snapshot is not supported"
)

if e_type in ids.EVGROUP_FM_STEP:
if e_type in ids.EVGROUP_REALIZATION:
status = _FM_TYPE_EVENT_TO_STATUS[e_type]
start_time = None
end_time = None
if e_type == ids.EVTYPE_FM_STEP_RUNNING:
if e_type == ids.EVTYPE_REALIZATION_RUNNING:
start_time = convert_iso8601_to_datetime(timestamp)
elif e_type in {
ids.EVTYPE_FM_STEP_SUCCESS,
ids.EVTYPE_FM_STEP_FAILURE,
ids.EVTYPE_FM_STEP_TIMEOUT,
ids.EVTYPE_REALIZATION_SUCCESS,
ids.EVTYPE_REALIZATION_FAILURE,
ids.EVTYPE_REALIZATION_TIMEOUT,
}:
end_time = convert_iso8601_to_datetime(timestamp)
self.update_step(
_get_real_id(e_source),
Step(
**_filter_nones(
{
"status": status,
"start_time": start_time,
"end_time": end_time,
}
)
),

self.update_realization(
_get_real_id(e_source), status, start_time, end_time
)

if e_type == ids.EVTYPE_FM_STEP_TIMEOUT:
if e_type == ids.EVTYPE_REALIZATION_TIMEOUT:
for job_id, job in self._snapshot.get_jobs_for_real(
_get_real_id(e_source)
).items():
Expand All @@ -317,6 +255,7 @@ def from_cloudevent(self, event: CloudEvent) -> "PartialSnapshot":
)

elif e_type in ids.EVGROUP_FM_JOB:
status = _FM_TYPE_EVENT_TO_STATUS[e_type]
start_time = None
end_time = None
if e_type == ids.EVTYPE_FM_JOB_START:
Expand Down Expand Up @@ -400,9 +339,6 @@ def get_job_status_for_all_reals(
def reals(self) -> Mapping[str, "RealizationSnapshot"]:
return self._my_partial.reals

def step(self, real_id: str) -> Dict[str, Union[str, datetime.datetime]]:
return self._my_partial._step_states[real_id]

def get_jobs_for_real(self, real_id: str) -> Dict[str, "Job"]:
return {
job_idx[1]: Job(**job_data)
Expand All @@ -413,17 +349,9 @@ def get_jobs_for_real(self, real_id: str) -> Dict[str, "Job"]:
def get_real(self, real_id: str) -> "RealizationSnapshot":
return RealizationSnapshot(**self._my_partial._realization_states[real_id])

def get_step(self, real_id: str) -> "Step":
return Step(**self._my_partial._step_states[real_id])

def get_job(self, real_id: str, job_id: str) -> "Job":
return Job(**self._my_partial._job_states[(real_id, job_id)])

def step_finished(self, real_id: str) -> bool:
return (
self._my_partial._step_states[real_id]["status"] == state.STEP_STATE_SUCCESS
)

def get_successful_realizations(self) -> int:
return len(
[
Expand Down Expand Up @@ -459,18 +387,11 @@ class Job(BaseModel):
stderr: Optional[str]


class Step(BaseModel):
status: Optional[str]
start_time: Optional[datetime.datetime]
end_time: Optional[datetime.datetime]


class RealizationSnapshot(BaseModel):
status: Optional[str]
active: Optional[bool]
start_time: Optional[datetime.datetime]
end_time: Optional[datetime.datetime]
step: Optional[Step]
jobs: Dict[str, Job] = {}


Expand All @@ -481,7 +402,6 @@ class SnapshotDict(BaseModel):


class SnapshotBuilder(BaseModel):
step: Optional[Step] = None
jobs: Dict[str, Job] = {}
metadata: Dict[str, Any] = {}

Expand All @@ -496,23 +416,13 @@ def build(
for r_id in real_ids:
top.reals[r_id] = RealizationSnapshot(
active=True,
step=self.step,
jobs=self.jobs,
start_time=start_time,
end_time=end_time,
status=status,
)
return Snapshot(top.dict())

def add_step(
self,
status: Optional[str],
start_time: Optional[datetime.datetime] = None,
end_time: Optional[datetime.datetime] = None,
) -> "SnapshotBuilder":
self.step = Step(status=status, start_time=start_time, end_time=end_time)
return self

def add_job(
self,
job_id: str,
Expand Down Expand Up @@ -555,15 +465,6 @@ def _from_nested_dict(data: Mapping[str, Any]) -> PartialSnapshot:
"end_time": realization_data.get("end_time"),
}
)
step_data = data["reals"][real_id].get("step", {})
if step_data is not None:
partial._step_states[real_id] = _filter_nones(
{
"status": step_data.get("status"),
"start_time": step_data.get("start_time"),
"end_time": step_data.get("end_time"),
}
)
for job_id, job in realization_data.get("jobs", {}).items():
job_idx = (real_id, job_id)
partial._job_states[job_idx] = job
Expand Down
Loading

0 comments on commit e7ec249

Please sign in to comment.