Skip to content

Commit

Permalink
Propagate and show exec_hosts in run_dialog if present
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas-el authored and berland committed Oct 7, 2024
1 parent a52cebf commit 80a06be
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/_ert/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class RealizationBaseEvent(BaseEvent):
real: str
ensemble: Union[str, None] = None
queue_event_type: Union[str, None] = None
exec_hosts: Union[str, None] = None


class RealizationPending(RealizationBaseEvent):
Expand Down
6 changes: 6 additions & 0 deletions src/ert/ensemble_evaluator/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def update_realization(
status: str,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
exec_hosts: Optional[str] = None,
callback_status_message: Optional[str] = None,
) -> "EnsembleSnapshot":
self._realization_snapshots[real_id].update(
Expand All @@ -260,6 +261,7 @@ def update_realization(
status=status,
start_time=start_time,
end_time=end_time,
exec_hosts=exec_hosts,
callback_status_message=callback_status_message,
)
)
Expand All @@ -279,6 +281,7 @@ def update_from_event(
status = _FM_TYPE_EVENT_TO_STATUS[type(event)]
start_time = None
end_time = None
exec_hosts = event.exec_hosts
callback_status_message = None

if e_type is RealizationRunning:
Expand All @@ -296,6 +299,7 @@ def update_from_event(
status,
start_time,
end_time,
exec_hosts,
callback_status_message,
)

Expand Down Expand Up @@ -397,6 +401,7 @@ class RealizationSnapshot(TypedDict, total=False):
active: Optional[bool]
start_time: Optional[datetime]
end_time: Optional[datetime]
exec_hosts: Optional[str]
fm_steps: Dict[str, FMStepSnapshot]
callback_status_message: Optional[str]

Expand All @@ -409,6 +414,7 @@ def _realization_dict_to_realization_snapshot(
active=source.get("active"),
start_time=source.get("start_time"),
end_time=source.get("end_time"),
exec_hosts=source.get("exec_hosts"),
callback_status_message=source.get("callback_status_message"),
fm_steps=source.get("fm_steps", {}),
)
Expand Down
1 change: 1 addition & 0 deletions src/ert/gui/model/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class RealNodeData:
real_status_color: Optional[QColor] = None
current_memory_usage: Optional[int] = None
max_memory_usage: Optional[int] = None
exec_hosts: Optional[str] = None
stderr: Optional[str] = None
callback_status_message: Optional[str] = None

Expand Down
3 changes: 3 additions & 0 deletions src/ert/gui/model/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ def _update_snapshot(self, snapshot: EnsembleSnapshot, iter_: str) -> None:
data = real_node.data
if real_status := real.get("status"):
data.status = real_status
if real_exec_hosts := real.get("exec_hosts"):
data.exec_hosts = real_exec_hosts
for real_fm_step_id, color in (
metadata["aggr_fm_step_status_colors"].get(real_id, {}).items()
):
Expand Down Expand Up @@ -240,6 +242,7 @@ def _add_snapshot(self, snapshot: EnsembleSnapshot, iter_: str) -> None:
data=RealNodeData(
status=real.get("status"),
active=real.get("active"),
exec_hosts=real.get("exec_hosts"),
fm_step_status_color_by_id=metadata.get(
"aggr_fm_step_status_colors", defaultdict(None)
)[real_id],
Expand Down
14 changes: 13 additions & 1 deletion src/ert/gui/simulation/run_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def __init__(
self._snapshot_model.rowsInserted.connect(self.on_snapshot_new_iteration)

self._fm_step_label = QLabel(self)
self._fm_step_label.setObjectName("fm_step_label")
self._fm_step_overview = FMStepOverview(self._snapshot_model, self)

self.running_time = QLabel("")
Expand Down Expand Up @@ -335,10 +336,21 @@ def on_snapshot_new_iteration(
def _select_real(self, index: QModelIndex) -> None:
real = index.row()
iter_ = index.model().get_iter() # type: ignore
exec_hosts = None

iter_node = self._snapshot_model.root.children.get(str(iter_), None)
if iter_node:
real_node = iter_node.children.get(str(real), None)
if real_node:
exec_hosts = real_node.data.exec_hosts

self._fm_step_overview.set_realization(iter_, real)
self._fm_step_label.setText(
text = (
f"Realization id {index.data(RealIens)} in iteration {index.data(IterNum)}"
)
if exec_hosts and exec_hosts != "-":
text += f", assigned to host: {exec_hosts}"
self._fm_step_label.setText(text)

def closeEvent(self, a0: Optional[QCloseEvent]) -> None:
if not self._notifier.is_simulation_running:
Expand Down
2 changes: 2 additions & 0 deletions src/ert/scheduler/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
@dataclass
class StartedEvent:
iens: int
exec_hosts: str = "-"


@dataclass
class FinishedEvent:
iens: int
returncode: int
exec_hosts: str = "-"


Event = Union[StartedEvent, FinishedEvent]
2 changes: 2 additions & 0 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self, scheduler: Scheduler, real: Realization) -> None:
self.real = real
self.state = JobState.WAITING
self.started = asyncio.Event()
self.exec_hosts: str = "-"
self.returncode: asyncio.Future[int] = asyncio.Future()
self._aborted = False
self._scheduler: Scheduler = scheduler
Expand Down Expand Up @@ -263,6 +264,7 @@ async def _send(self, state: JobState) -> None:
"event_type": _queue_jobstate_event_type[state],
"queue_event_type": state,
"real": str(self.iens),
"exec_hosts": self.exec_hosts,
}
self.state = state
if state == JobState.FAILED:
Expand Down
12 changes: 9 additions & 3 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,16 +500,22 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
event: Optional[Event] = None
if isinstance(new_state, RunningJob):
logger.debug(f"Realization {iens} is running")
event = StartedEvent(iens=iens)
event = StartedEvent(iens=iens, exec_hosts=self._jobs[job_id].exec_hosts)
elif isinstance(new_state, FinishedJobFailure):
logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed")
exit_code = await self._get_exit_code(job_id)
event = FinishedEvent(iens=iens, returncode=exit_code)
event = FinishedEvent(
iens=iens,
returncode=exit_code,
exec_hosts=self._jobs[job_id].exec_hosts,
)
elif isinstance(new_state, FinishedJobSuccess):
logger.info(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"
)
event = FinishedEvent(iens=iens, returncode=0)
event = FinishedEvent(
iens=iens, returncode=0, exec_hosts=self._jobs[job_id].exec_hosts
)

if event:
if isinstance(event, FinishedEvent):
Expand Down
5 changes: 4 additions & 1 deletion src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ert.constant_filenames import CERT_FILE

from .driver import Driver
from .event import FinishedEvent
from .event import FinishedEvent, StartedEvent
from .job import Job, JobState

if TYPE_CHECKING:
Expand Down Expand Up @@ -308,6 +308,9 @@ async def _process_event_queue(self) -> None:
# Any event implies the job has at least started
job.started.set()

if isinstance(event, (StartedEvent, FinishedEvent)) and event.exec_hosts:
self._jobs[event.iens].exec_hosts = event.exec_hosts

if (
isinstance(event, FinishedEvent)
and not self._cancelled
Expand Down
2 changes: 2 additions & 0 deletions tests/ert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def build(
self,
real_ids: Sequence[str],
status: Optional[str],
exec_hosts: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> EnsembleSnapshot:
Expand All @@ -49,6 +50,7 @@ def build(
fm_steps=deepcopy(self.fm_steps),
start_time=start_time,
end_time=end_time,
exec_hosts=exec_hosts,
status=status,
),
)
Expand Down
1 change: 1 addition & 0 deletions tests/ert/unit_tests/gui/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def full_snapshot() -> EnsembleSnapshot:
real = RealizationSnapshot(
status=REALIZATION_STATE_RUNNING,
active=True,
exec_hosts="COMP-01",
fm_steps={
"0": FMStepSnapshot(
start_time=dt.now(),
Expand Down
28 changes: 28 additions & 0 deletions tests/ert/unit_tests/gui/model/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,31 @@ def test_snapshot_model_data_intact_on_full_update(full_snapshot, fail_snapshot)
first_real = model.index(0, 0, model.index(0, 0))

assert first_real.internalPointer().children["0"].data["status"] == "Finished"


@pytest.mark.parametrize(
"has_exec_hosts, expected_value",
[
pytest.param(
True,
"COMP-01",
id="Host assigned",
),
pytest.param(
False,
None,
id="No host assigned",
),
],
)
def test_snapshot_model_exec_hosts_propagated(
full_snapshot, fail_snapshot, has_exec_hosts, expected_value
):
model = SnapshotModel()
a_snapshot = full_snapshot if has_exec_hosts else fail_snapshot

model._add_snapshot(SnapshotModel.prerender(a_snapshot), "0")
model._update_snapshot(SnapshotModel.prerender(a_snapshot), "0")

first_real = model.index(0, 0, model.index(0, 0))
assert first_real.internalPointer().data.exec_hosts == expected_value
101 changes: 100 additions & 1 deletion tests/ert/unit_tests/gui/simulation/test_run_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
from pytestqt.qtbot import QtBot
from qtpy import QtWidgets
from qtpy.QtCore import Qt, QTimer
from qtpy.QtWidgets import QApplication, QComboBox, QPushButton, QToolButton, QWidget
from qtpy.QtWidgets import (
QApplication,
QComboBox,
QLabel,
QPushButton,
QToolButton,
QWidget,
)

import ert
from ert.config import ErtConfig
Expand Down Expand Up @@ -459,6 +466,98 @@ def test_run_dialog_memory_usage_showing(
assert max_memory_value == "60.00 KB"


@pytest.mark.parametrize(
"events, tab_widget_count, expected_host_info",
[
pytest.param(
[
FullSnapshotEvent(
snapshot=(
SnapshotBuilder()
.add_fm_step(
fm_step_id="0",
index="0",
name="fm_step_0",
status=state.FORWARD_MODEL_STATE_START,
)
.build(
["0"],
status=state.REALIZATION_STATE_UNKNOWN,
exec_hosts="COMP_01",
)
),
iteration_label="Foo",
current_iteration=0,
total_iterations=1,
progress=0.25,
realization_count=4,
status_count={"Finished": 1, "Pending": 1, "Unknown": 2},
iteration=0,
),
EndEvent(failed=False, msg=""),
],
1,
", assigned to host: COMP_01",
id="Simulation where exec_host present",
),
pytest.param(
[
FullSnapshotEvent(
snapshot=(
SnapshotBuilder()
.add_fm_step(
fm_step_id="0",
index="0",
name="fm_step_0",
status=state.FORWARD_MODEL_STATE_START,
)
.build(["0"], status=state.REALIZATION_STATE_UNKNOWN)
),
iteration_label="Foo",
current_iteration=0,
total_iterations=1,
progress=0.25,
realization_count=4,
status_count={"Finished": 1, "Pending": 1, "Unknown": 2},
iteration=0,
),
EndEvent(failed=False, msg=""),
],
1,
"",
id="Simulation where exec_host not present",
),
],
)
def test_run_dialog_fm_label_show_correct_info(
events, tab_widget_count, expected_host_info, qtbot: QtBot, event_queue, run_dialog
):
run_dialog.run_experiment()
for event in events:
event_queue.put(event)

qtbot.waitUntil(
lambda: run_dialog._tab_widget.count() == tab_widget_count, timeout=5000
)
qtbot.waitUntil(lambda: not run_dialog.done_button.isHidden(), timeout=5000)

# This is the container of realization boxes
realization_box = run_dialog._tab_widget.widget(0)
assert type(realization_box) == RealizationWidget
# Click the first realization box
qtbot.mouseClick(realization_box, Qt.LeftButton)
fm_step_model = run_dialog._fm_step_overview.model()
assert fm_step_model._real == 0

fm_step_label = run_dialog.findChild(QLabel, name="fm_step_label")
assert not fm_step_label.text()

realization_box._item_clicked(run_dialog._fm_step_overview.model().index(0, 0))
assert (
fm_step_label.text() == f"Realization id 0 in iteration 0{expected_host_info}"
)


@pytest.mark.integration_test
@pytest.mark.usefixtures("use_tmpdir")
def test_that_exception_in_base_run_model_is_handled(qtbot: QtBot, storage):
Expand Down

0 comments on commit 80a06be

Please sign in to comment.