Skip to content

Commit

Permalink
Add integration test.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Dec 15, 2024
1 parent a246236 commit 35a8e0e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
6 changes: 3 additions & 3 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2208,7 +2208,7 @@ def force_trigger_tasks(
flow: List[str],
flow_wait: bool = False,
flow_descr: Optional[str] = None,
now: bool = False
on_resume: bool = False
):
"""Manually trigger tasks.
Expand Down Expand Up @@ -2244,7 +2244,7 @@ def force_trigger_tasks(
LOG.error(f"[{itask}] ignoring trigger - already active")
continue
self.merge_flows(itask, flow_nums)
self._force_trigger(itask, now)
self._force_trigger(itask, on_resume)

# Spawn and trigger inactive tasks.
if not flow:
Expand Down Expand Up @@ -2279,7 +2279,7 @@ def force_trigger_tasks(

# run it (or run it again for incomplete flow-wait)
self.add_to_pool(itask)
self._force_trigger(itask, now)
self._force_trigger(itask, on_resume)

def spawn_parentless_sequential_xtriggers(self):
"""Spawn successor(s) of parentless wall clock satisfied tasks."""
Expand Down
63 changes: 62 additions & 1 deletion tests/integration/test_force_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def test_trigger_workflow_paused(
schd.release_tasks_to_run()
assert len(submitted_tasks) == 1

# manually trigger 1/y - it should not be queued but not submitted
# manually trigger 1/y - it should be queued but not submitted
# (queue limit reached)
schd.pool.force_trigger_tasks(['1/y'], [1])
schd.release_tasks_to_run()
Expand All @@ -87,3 +87,64 @@ async def test_trigger_workflow_paused(
level=logging.ERROR,
contains="ignoring trigger - already active"
)


async def test_trigger_on_resume(
flow: 'Fixture',
scheduler: 'Fixture',
start: 'Fixture',
capture_submission: 'Fixture',
log_filter: Callable
):
"""
Test manual triggering on-resume option when the workflow is paused.
https://github.com/cylc/cylc-flow/issues/6192
"""
id_ = flow({
'scheduling': {
'queues': {
'default': {
'limit': 1,
},
},
'graph': {
'R1': '''
a => x & y & z
''',
},
},
})
schd = scheduler(id_, paused_start=True)

# start the scheduler (but don't set the main loop running)
async with start(schd) as log:

# capture task submissions (prevents real submissions)
submitted_tasks = capture_submission(schd)

# paused at start-up so no tasks should be submitted
assert len(submitted_tasks) == 0

# manually trigger 1/x - it not should be submitted
schd.pool.force_trigger_tasks(['1/x'], [1], on_resume=True)
schd.release_tasks_to_run()
assert len(submitted_tasks) == 0

# manually trigger 1/y - it should not be submitted
# (queue limit reached)
schd.pool.force_trigger_tasks(['1/y'], [1], on_resume=True)
schd.release_tasks_to_run()
assert len(submitted_tasks) == 0

# manually trigger 1/y again - it should not be submitted
# (triggering a queued task runs it)
schd.pool.force_trigger_tasks(['1/y'], [1], on_resume=True)
schd.release_tasks_to_run()
assert len(submitted_tasks) == 0

# resume the workflow, both tasks should trigger now.
schd.resume_workflow()
schd.release_tasks_to_run()
assert len(submitted_tasks) == 2

0 comments on commit 35a8e0e

Please sign in to comment.