-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add retry loop into the scheduler handling resubmitting when running state failed #6782
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #6782 +/- ##
==========================================
+ Coverage 83.86% 83.87% +0.01%
==========================================
Files 365 365
Lines 21353 21380 +27
Branches 948 948
==========================================
+ Hits 17907 17932 +25
- Misses 3152 3154 +2
Partials 294 294 ☔ View full report in Codecov by Sentry. |
7698ac5
to
3e725e8
Compare
04defcf
to
b0b890c
Compare
src/ert/scheduler/job.py
Outdated
await asyncio.sleep(0.01) | ||
returncode = await self.returncode | ||
# we need to make sure that the task has finished too | ||
await self.driver.wait(self.real.iens) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it optimal to use the name wait()
for this, as it looks like the same as the lines with self.started.wait()
, while the latter is waiting for an asyncio.Event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a name sufficiently clear to avoid the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wait
function leaks implementation details of the LocalDriver
. Why can't the LocalDriver
itself do the right thing when the job is ended / restarted?
Also, I'm not sure what we are waiting for anyway. The subprocess is done at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't the LocalDriver itself do the right thing when the job is ended / restarted?
Since we want to have it non-blocking and thus we need to await out of the scope. The thing that Subprocess has ended does not guarantee that the task has ended too, ie. got failures on tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that it's a subprocess is implementation detail. await self.returncode
already means "wait for the job to end". Do we need another wait
function to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, but as you said it's the job that has finished not the task. You can try to remove it and the test with retries will fail. I guess this await task
does more to that async context.
src/ert/scheduler/job.py
Outdated
sem.release() | ||
retries = 0 | ||
retry: bool = True | ||
while retry: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe separate the retry functionality into its own function? This is beginning to be a mess.
Eg:
async def __call__(...):
for _ in range(max_submit):
if await self.actually_do_the_thing(...):
# SUCCESS!
else:
# FAILURE (No more retries)
Also, using a normal loop will probably avoid having to do these retries
, retry
variables and manually checking whether you should escape.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, you may want to reset the events and futures on this job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe separate the retry functionality into its own function? This is beginning to be a mess.
Agree, I saw it coming, will give it some tries to split it in 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it's split.
src/ert/scheduler/job.py
Outdated
finally: | ||
sem.release() | ||
retries = 0 | ||
retry: bool = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retry: bool = True | |
retry = True |
True
has type bool
, just like 0
has type int
. No need to specify it explicitly.
src/ert/scheduler/job.py
Outdated
await asyncio.sleep(0.01) | ||
returncode = await self.returncode | ||
# we need to make sure that the task has finished too | ||
await self.driver.wait(self.real.iens) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wait
function leaks implementation details of the LocalDriver
. Why can't the LocalDriver
itself do the right thing when the job is ended / restarted?
Also, I'm not sure what we are waiting for anyway. The subprocess is done at this point.
src/ert/scheduler/job.py
Outdated
sem.release() | ||
retries = 0 | ||
retry: bool = True | ||
while retry: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, you may want to reset the events and futures on this job.
src/ert/scheduler/local_driver.py
Outdated
@@ -23,6 +23,9 @@ async def kill(self, iens: int) -> None: | |||
except KeyError: | |||
return | |||
|
|||
async def wait(self, iens: int) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would an LFSDriver
implementation of this function be like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be an Event that is triggered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, what are we wait
for in LSF? The job being completed/failed is already covered by Job
's returncode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For localdriver, this task was never awaited:
self._tasks[iens] = asyncio.create_task(
self._wait_until_finish(iens, executable, *args, cwd=cwd)
)
We need to take the task awaiting out from the driver to the Job level (ie. await driver.wait()
) to have the driver atomic functions (init, kill, poll) non-blocking. For LSF 🤷 maybe empty then?
6b5ec33
to
b3737a3
Compare
src/ert/scheduler/job.py
Outdated
@@ -81,6 +80,9 @@ async def __call__( | |||
while not self.returncode.done(): | |||
await asyncio.sleep(0.01) | |||
returncode = await self.returncode | |||
# we need to make sure that the task has finished too | |||
await self.driver.wait(self.real.iens) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the presence of a comment here means that the function in the driver should have a clearer name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that was not clear that the name was an issue :)
src/ert/scheduler/driver.py
Outdated
|
||
@abstractmethod | ||
async def wait(self, iens: int) -> None: | ||
"""Blocks the execution of a job associated with a realization. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is too vague what is to be waited for by in implementations of this function. And should we use "block", as we never block anything, do we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do when we call block, but you are correct it's not the job execution but the event loop only I guess
@@ -108,3 +107,16 @@ async def test_cancel(tmp_path: Path, realization): | |||
|
|||
assert (tmp_path / "a").exists() | |||
assert not (tmp_path / "b").exists() | |||
|
|||
|
|||
async def test_that_max_submit_was_reached(tmp_path: Path, realization): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we ensure that the edge_case max_submit=1 is covered?
(what about max_submit=0, is that covered by the config parser?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can do a parametriization ranging from 1 to 3? Regarding 0 that's a matter of config and not this test.
Use while retry to iterate from running to waiting states. It includes a simple test to check if job has started several times. Max_submit is a function parameter of job.__call__ that is passed on from scheduler. Additionally, function driver.finish will implement the basic clean up functionally. For the local driver it makes sure that all tasks have been awaited correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Issue
Resolves #6771
Approach
Use while retry to iterate from running to waiting states. It includes a
simple test to check if job has started several times. Max_submit is a function
parameter of job.call that is passed on from scheduler.
Additionally, function driver.finish will implement the basic clean up
functionally. For the local driver it makes sure that all tasks have
been awaited correctly.
Pre review checklist
Ground Rules),
and changes to existing code have good test coverage.
Pre merge checklist