From 35a8e0eb3696f60fd01f6eac9af06f88afc2c5f9 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sun, 15 Dec 2024 23:02:33 +0000 Subject: [PATCH] Add integration test. --- cylc/flow/task_pool.py | 6 +-- tests/integration/test_force_trigger.py | 63 ++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 5b35773d8c..4732f43d58 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -2208,7 +2208,7 @@ def force_trigger_tasks( flow: List[str], flow_wait: bool = False, flow_descr: Optional[str] = None, - now: bool = False + on_resume: bool = False ): """Manually trigger tasks. @@ -2244,7 +2244,7 @@ def force_trigger_tasks( LOG.error(f"[{itask}] ignoring trigger - already active") continue self.merge_flows(itask, flow_nums) - self._force_trigger(itask, now) + self._force_trigger(itask, on_resume) # Spawn and trigger inactive tasks. if not flow: @@ -2279,7 +2279,7 @@ def force_trigger_tasks( # run it (or run it again for incomplete flow-wait) self.add_to_pool(itask) - self._force_trigger(itask, now) + self._force_trigger(itask, on_resume) def spawn_parentless_sequential_xtriggers(self): """Spawn successor(s) of parentless wall clock satisfied tasks.""" diff --git a/tests/integration/test_force_trigger.py b/tests/integration/test_force_trigger.py index b5f2c4b3f4..7c38c0cec3 100644 --- a/tests/integration/test_force_trigger.py +++ b/tests/integration/test_force_trigger.py @@ -66,7 +66,7 @@ async def test_trigger_workflow_paused( schd.release_tasks_to_run() assert len(submitted_tasks) == 1 - # manually trigger 1/y - it should not be queued but not submitted + # manually trigger 1/y - it should be queued but not submitted # (queue limit reached) schd.pool.force_trigger_tasks(['1/y'], [1]) schd.release_tasks_to_run() @@ -87,3 +87,64 @@ async def test_trigger_workflow_paused( level=logging.ERROR, contains="ignoring trigger - already active" ) + + +async def test_trigger_on_resume( + flow: 'Fixture', + scheduler: 'Fixture', + start: 'Fixture', + capture_submission: 'Fixture', + log_filter: Callable +): + """ + Test manual triggering on-resume option when the workflow is paused. + + https://github.com/cylc/cylc-flow/issues/6192 + + """ + id_ = flow({ + 'scheduling': { + 'queues': { + 'default': { + 'limit': 1, + }, + }, + 'graph': { + 'R1': ''' + a => x & y & z + ''', + }, + }, + }) + schd = scheduler(id_, paused_start=True) + + # start the scheduler (but don't set the main loop running) + async with start(schd) as log: + + # capture task submissions (prevents real submissions) + submitted_tasks = capture_submission(schd) + + # paused at start-up so no tasks should be submitted + assert len(submitted_tasks) == 0 + + # manually trigger 1/x - it not should be submitted + schd.pool.force_trigger_tasks(['1/x'], [1], on_resume=True) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 0 + + # manually trigger 1/y - it should not be submitted + # (queue limit reached) + schd.pool.force_trigger_tasks(['1/y'], [1], on_resume=True) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 0 + + # manually trigger 1/y again - it should not be submitted + # (triggering a queued task runs it) + schd.pool.force_trigger_tasks(['1/y'], [1], on_resume=True) + schd.release_tasks_to_run() + assert len(submitted_tasks) == 0 + + # resume the workflow, both tasks should trigger now. + schd.resume_workflow() + schd.release_tasks_to_run() + assert len(submitted_tasks) == 2