From a24623691237cda7abe5daffdbc77cb45f852740 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sun, 15 Dec 2024 22:48:05 +0000 Subject: [PATCH] Revert flow-assignment test. --- tests/integration/test_flow_assignment.py | 163 ++++------------------ 1 file changed, 30 insertions(+), 133 deletions(-) diff --git a/tests/integration/test_flow_assignment.py b/tests/integration/test_flow_assignment.py index 7f186e8652..ea729efeb7 100644 --- a/tests/integration/test_flow_assignment.py +++ b/tests/integration/test_flow_assignment.py @@ -81,14 +81,11 @@ async def test_get_flow_nums(one: Scheduler, start): assert one.pool._get_flow_nums([FLOW_ALL]) == {1, 2} -async def test_flow_assignment_set( - flow: 'Fixture', - scheduler: 'Fixture', - start: 'Fixture', - log_filter: Callable, - capture_submission: 'Fixture', +@pytest.mark.parametrize('command', ['trigger', 'set']) +async def test_flow_assignment( + flow, scheduler, start, command: str, log_filter: Callable ): - """Test flow assignment when setting tasks. + """Test flow assignment when triggering/setting tasks. Active tasks: By default keep existing flows, else merge with requested flows. @@ -113,65 +110,53 @@ async def test_flow_assignment_set( } id_ = flow(conf) schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True) - async with start(schd) as log: - - # capture task submissions (prevents real submissions) - submitted_tasks = capture_submission(schd) - - command = "set" - do_command = functools.partial( - schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[] - ) - - # get active foo and bar (which is which doesn't matter) - # ("active" here means in the active task pool). - active_x, active_y = schd.pool.get_tasks() - schd.pool.merge_flows(active_y, schd.pool._get_flow_nums([FLOW_NEW])) - assert active_x.flow_nums == {1} - assert active_y.flow_nums == {1, 2} + async with start(schd): + if command == 'set': + do_command: Callable = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[] + ) + else: + do_command = schd.pool.force_trigger_tasks + + active_a, active_b = schd.pool.get_tasks() + schd.pool.merge_flows(active_b, schd.pool._get_flow_nums([FLOW_NEW])) + assert active_a.flow_nums == {1} + assert active_b.flow_nums == {1, 2} # -----(1. Test active tasks)----- - # Note this also tests that setting prerequisites - # By default active tasks keep existing flow assignment. - do_command([active_x.identity], flow=[]) - assert active_x.flow_nums == {1} + do_command([active_a.identity], flow=[]) + assert active_a.flow_nums == {1} # Else merge existing flow with requested flows. - do_command([active_x.identity], flow=[FLOW_ALL]) - assert active_x.flow_nums == {1, 2} + do_command([active_a.identity], flow=[FLOW_ALL]) + assert active_a.flow_nums == {1, 2} # (no-flow is ignored for active tasks) - do_command([active_x.identity], flow=[FLOW_NONE]) - assert active_x.flow_nums == {1, 2} + do_command([active_a.identity], flow=[FLOW_NONE]) + assert active_a.flow_nums == {1, 2} assert log_filter( contains=( - f'[{active_x}] ignoring \'flow=none\' {command}: ' - f'task already has {repr_flow_nums(active_x.flow_nums)}' + f'[{active_a}] ignoring \'flow=none\' {command}: ' + f'task already has {repr_flow_nums(active_a.flow_nums)}' ), level=logging.ERROR ) - do_command([active_x.identity], flow=[FLOW_NEW]) - assert active_x.flow_nums == {1, 2, 3} + do_command([active_a.identity], flow=[FLOW_NEW]) + assert active_a.flow_nums == {1, 2, 3} # -----(2. Test inactive tasks)----- - do_command = functools.partial( - schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all'] - ) + if command == 'set': + do_command = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all'] + ) # By default inactive tasks get all active flows. do_command(['1/a'], flow=[]) assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2, 3} - # set --pre=all should not cause a task to submit in a paused workflow - assert len(submitted_tasks) == 0 - # but triggering it should - schd.pool.force_trigger_tasks(['1/a'], []) - schd.release_tasks_to_run() - assert len(submitted_tasks) == 1 - # Else assign requested flows. do_command(['1/b'], flow=[FLOW_NONE]) assert schd.pool._get_task_by_id('1/b').flow_nums == set() @@ -181,93 +166,5 @@ async def test_flow_assignment_set( do_command(['1/d'], flow=[FLOW_ALL]) assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3, 4} - - do_command(['1/e'], flow=[7]) - assert schd.pool._get_task_by_id('1/e').flow_nums == {7} - - -async def test_flow_assignment_trigger( - flow, scheduler, start, log_filter: Callable -): - """Test flow assignment when triggering tasks. - - Active tasks: - By default keep existing flows, else merge with requested flows. - Inactive tasks: - By default assign active flows; else assign requested flows. - - Note this differs from the 'set' test above because triggering a - task once makes it active (even in a paused workflow) after which - additional triggers get ignored. - - """ - conf = { - 'scheduler': { - 'allow implicit tasks': 'True' - }, - 'scheduling': { - 'graph': { - 'R1': "foo & bar => a & b & c & d & e" - } - }, - 'runtime': { - 'foo': { - 'outputs': {'x': 'x'} - } - }, - } - id_ = flow(conf) - schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True) - async with start(schd) as log: - - command = 'trigger' - do_command = schd.pool.force_trigger_tasks - - # get active foo and bar (which is which doesn't matter) - # ("active" here means in the active task pool). - active_x, active_y = schd.pool.get_tasks() - schd.pool.merge_flows(active_y, schd.pool._get_flow_nums([FLOW_NEW])) - assert active_x.flow_nums == {1} - assert active_y.flow_nums == {1, 2} - - # -----(1. Test active tasks)----- - - # By default active tasks keep existing flow assignment. - # This trigger makes the task literally active (submitted, running) - do_command([active_x.identity], flow=[]) - schd.release_tasks_to_run() - assert active_x.flow_nums == {1} - - # Triggering it again will be ignored (already active). - do_command([active_x.identity], flow=[FLOW_ALL]) - schd.release_tasks_to_run() - assert active_x.flow_nums == {1} - assert log_filter( - contains=( - f'[{active_x}] ignoring trigger - already active' - ), - level=logging.ERROR - ) - - # -----(2. Test inactive tasks)----- - # By default inactive tasks get all active flows. - do_command(['1/a'], flow=[]) - schd.release_tasks_to_run() - assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2} - - # Else assign requested flows. - do_command(['1/b'], flow=[FLOW_NONE]) - schd.release_tasks_to_run() - assert schd.pool._get_task_by_id('1/b').flow_nums == set() - - do_command(['1/c'], flow=[FLOW_NEW]) - schd.release_tasks_to_run() - assert schd.pool._get_task_by_id('1/c').flow_nums == {3} - - do_command(['1/d'], flow=[FLOW_ALL]) - schd.release_tasks_to_run() - assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3} - do_command(['1/e'], flow=[7]) - schd.release_tasks_to_run() assert schd.pool._get_task_by_id('1/e').flow_nums == {7}