Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose job_queue as a property #6628

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions src/ert/simulator/simulation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __init__(

# Wait until the queue is active before we finish the creation
# to ensure sane job status while running
while self.isRunning() and not self._job_queue.is_active():
while self.isRunning() and not self.job_queue.is_active():
sleep(0.1)

def get_run_args(self, iens: int) -> "RunArg":
Expand All @@ -139,7 +139,7 @@ def get_run_args(self, iens: int) -> "RunArg":
def _run_simulations_simple_step(self) -> Thread:
sim_thread = Thread(
target=lambda: _run_forward_model(
self._ert, self._job_queue, self._run_context
self._ert, self.job_queue, self._run_context
)
)
sim_thread.start()
Expand All @@ -150,15 +150,13 @@ def __len__(self) -> int:

def isRunning(self) -> bool:
# TODO: Should separate between running jobs and having loaded all data
return self._sim_thread.is_alive() or self._job_queue.is_active()
return self._sim_thread.is_alive() or self.job_queue.is_active()

def didRealizationSucceed(self, iens: int) -> bool:
queue_index = self.get_run_args(iens).queue_index
if queue_index is None:
raise ValueError("Queue index not set")
return (
self._job_queue.realization_state(queue_index) == RealizationState.SUCCESS
)
return self.job_queue.realization_state(queue_index) == RealizationState.SUCCESS

def didRealizationFail(self, iens: int) -> bool:
# For the purposes of this class, a failure should be anything (killed
Expand All @@ -171,19 +169,23 @@ def isRealizationFinished(self, iens: int) -> bool:
queue_index = run_arg.queue_index
if queue_index is not None:
return not (
self._job_queue.realization_state(queue_index)
self.job_queue.realization_state(queue_index)
in [RealizationState.SUCCESS, RealizationState.WAITING]
)
else:
# job was not submitted
return False

@property
def job_queue(self) -> JobQueue:
return self._job_queue

def __repr__(self) -> str:
running = "running" if self.isRunning() else "not running"
numRunn = self._job_queue.count_status(RealizationState.RUNNING)
numSucc = self._job_queue.count_status(RealizationState.SUCCESS)
numFail = self._job_queue.count_status(RealizationState.FAILED)
numWait = self._job_queue.count_status(RealizationState.WAITING)
numRunn = self.job_queue.count_status(RealizationState.RUNNING)
numSucc = self.job_queue.count_status(RealizationState.SUCCESS)
numFail = self.job_queue.count_status(RealizationState.FAILED)
numWait = self.job_queue.count_status(RealizationState.WAITING)
return (
f"SimulationContext({running}, #running = {numRunn}, "
f"#success = {numSucc}, #failed = {numFail}, #waiting = {numWait})"
Expand All @@ -193,7 +195,7 @@ def get_sim_fs(self) -> EnsembleAccessor:
return self._run_context.sim_fs

def stop(self) -> None:
self._job_queue.kill_all_jobs()
self.job_queue.kill_all_jobs()
self._sim_thread.join()

def job_progress(self, iens: int) -> Optional[ForwardModelStatus]:
Expand Down Expand Up @@ -224,7 +226,7 @@ def job_progress(self, iens: int) -> Optional[ForwardModelStatus]:
if queue_index is None:
# job was not submitted
return None
if self._job_queue.realization_state(queue_index) == RealizationState.WAITING:
if self.job_queue.realization_state(queue_index) == RealizationState.WAITING:
return None

return ForwardModelStatus.load(run_arg.runpath)
Expand All @@ -242,4 +244,4 @@ def job_status(self, iens: int) -> Optional[RealizationState]:
if queue_index is None:
# job was not submitted
return None
return self._job_queue.realization_state(queue_index)
return self.job_queue.realization_state(queue_index)
12 changes: 6 additions & 6 deletions tests/unit_tests/simulator/test_simulation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ def test_simulation_context(setup_case, storage):
f"runpath/realization-{iens}-{iens}/iter-0"
)

assert even_ctx.getNumFailed() == 0
assert even_ctx.getNumRunning() == 0
assert even_ctx.getNumSuccess() == size / 2
assert even_ctx.job_queue.count_status(RealizationState.FAILED) == 0
assert even_ctx.job_queue.count_status(RealizationState.RUNNING) == 0
assert even_ctx.job_queue.count_status(RealizationState.SUCCESS) == size / 2

assert odd_ctx.getNumFailed() == 0
assert odd_ctx.getNumRunning() == 0
assert odd_ctx.getNumSuccess() == size / 2
assert odd_ctx.job_queue.count_status(RealizationState.FAILED) == 0
assert odd_ctx.job_queue.count_status(RealizationState.RUNNING) == 0
assert odd_ctx.job_queue.count_status(RealizationState.SUCCESS) == size / 2

for iens in range(size):
if iens % 2 == 0:
Expand Down
Loading