Skip to content

Commit

Permalink
Revert flow-assignment test.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Dec 15, 2024
1 parent 3d75bd2 commit a246236
Showing 1 changed file with 30 additions and 133 deletions.
163 changes: 30 additions & 133 deletions tests/integration/test_flow_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,11 @@ async def test_get_flow_nums(one: Scheduler, start):
assert one.pool._get_flow_nums([FLOW_ALL]) == {1, 2}


async def test_flow_assignment_set(
flow: 'Fixture',
scheduler: 'Fixture',
start: 'Fixture',
log_filter: Callable,
capture_submission: 'Fixture',
@pytest.mark.parametrize('command', ['trigger', 'set'])
async def test_flow_assignment(
flow, scheduler, start, command: str, log_filter: Callable
):
"""Test flow assignment when setting tasks.
"""Test flow assignment when triggering/setting tasks.
Active tasks:
By default keep existing flows, else merge with requested flows.
Expand All @@ -113,65 +110,53 @@ async def test_flow_assignment_set(
}
id_ = flow(conf)
schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True)
async with start(schd) as log:

# capture task submissions (prevents real submissions)
submitted_tasks = capture_submission(schd)

command = "set"
do_command = functools.partial(
schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[]
)

# get active foo and bar (which is which doesn't matter)
# ("active" here means in the active task pool).
active_x, active_y = schd.pool.get_tasks()
schd.pool.merge_flows(active_y, schd.pool._get_flow_nums([FLOW_NEW]))
assert active_x.flow_nums == {1}
assert active_y.flow_nums == {1, 2}
async with start(schd):
if command == 'set':
do_command: Callable = functools.partial(
schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[]
)
else:
do_command = schd.pool.force_trigger_tasks

active_a, active_b = schd.pool.get_tasks()
schd.pool.merge_flows(active_b, schd.pool._get_flow_nums([FLOW_NEW]))
assert active_a.flow_nums == {1}
assert active_b.flow_nums == {1, 2}

# -----(1. Test active tasks)-----

# Note this also tests that setting prerequisites

# By default active tasks keep existing flow assignment.
do_command([active_x.identity], flow=[])
assert active_x.flow_nums == {1}
do_command([active_a.identity], flow=[])
assert active_a.flow_nums == {1}

# Else merge existing flow with requested flows.
do_command([active_x.identity], flow=[FLOW_ALL])
assert active_x.flow_nums == {1, 2}
do_command([active_a.identity], flow=[FLOW_ALL])
assert active_a.flow_nums == {1, 2}

# (no-flow is ignored for active tasks)
do_command([active_x.identity], flow=[FLOW_NONE])
assert active_x.flow_nums == {1, 2}
do_command([active_a.identity], flow=[FLOW_NONE])
assert active_a.flow_nums == {1, 2}
assert log_filter(
contains=(
f'[{active_x}] ignoring \'flow=none\' {command}: '
f'task already has {repr_flow_nums(active_x.flow_nums)}'
f'[{active_a}] ignoring \'flow=none\' {command}: '
f'task already has {repr_flow_nums(active_a.flow_nums)}'
),
level=logging.ERROR
)

do_command([active_x.identity], flow=[FLOW_NEW])
assert active_x.flow_nums == {1, 2, 3}
do_command([active_a.identity], flow=[FLOW_NEW])
assert active_a.flow_nums == {1, 2, 3}

# -----(2. Test inactive tasks)-----
do_command = functools.partial(
schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all']
)
if command == 'set':
do_command = functools.partial(
schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all']
)

# By default inactive tasks get all active flows.
do_command(['1/a'], flow=[])
assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2, 3}

# set --pre=all should not cause a task to submit in a paused workflow
assert len(submitted_tasks) == 0
# but triggering it should
schd.pool.force_trigger_tasks(['1/a'], [])
schd.release_tasks_to_run()
assert len(submitted_tasks) == 1

# Else assign requested flows.
do_command(['1/b'], flow=[FLOW_NONE])
assert schd.pool._get_task_by_id('1/b').flow_nums == set()
Expand All @@ -181,93 +166,5 @@ async def test_flow_assignment_set(

do_command(['1/d'], flow=[FLOW_ALL])
assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3, 4}

do_command(['1/e'], flow=[7])
assert schd.pool._get_task_by_id('1/e').flow_nums == {7}


async def test_flow_assignment_trigger(
flow, scheduler, start, log_filter: Callable
):
"""Test flow assignment when triggering tasks.
Active tasks:
By default keep existing flows, else merge with requested flows.
Inactive tasks:
By default assign active flows; else assign requested flows.
Note this differs from the 'set' test above because triggering a
task once makes it active (even in a paused workflow) after which
additional triggers get ignored.
"""
conf = {
'scheduler': {
'allow implicit tasks': 'True'
},
'scheduling': {
'graph': {
'R1': "foo & bar => a & b & c & d & e"
}
},
'runtime': {
'foo': {
'outputs': {'x': 'x'}
}
},
}
id_ = flow(conf)
schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True)
async with start(schd) as log:

command = 'trigger'
do_command = schd.pool.force_trigger_tasks

# get active foo and bar (which is which doesn't matter)
# ("active" here means in the active task pool).
active_x, active_y = schd.pool.get_tasks()
schd.pool.merge_flows(active_y, schd.pool._get_flow_nums([FLOW_NEW]))
assert active_x.flow_nums == {1}
assert active_y.flow_nums == {1, 2}

# -----(1. Test active tasks)-----

# By default active tasks keep existing flow assignment.
# This trigger makes the task literally active (submitted, running)
do_command([active_x.identity], flow=[])
schd.release_tasks_to_run()
assert active_x.flow_nums == {1}

# Triggering it again will be ignored (already active).
do_command([active_x.identity], flow=[FLOW_ALL])
schd.release_tasks_to_run()
assert active_x.flow_nums == {1}
assert log_filter(
contains=(
f'[{active_x}] ignoring trigger - already active'
),
level=logging.ERROR
)

# -----(2. Test inactive tasks)-----
# By default inactive tasks get all active flows.
do_command(['1/a'], flow=[])
schd.release_tasks_to_run()
assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2}

# Else assign requested flows.
do_command(['1/b'], flow=[FLOW_NONE])
schd.release_tasks_to_run()
assert schd.pool._get_task_by_id('1/b').flow_nums == set()

do_command(['1/c'], flow=[FLOW_NEW])
schd.release_tasks_to_run()
assert schd.pool._get_task_by_id('1/c').flow_nums == {3}

do_command(['1/d'], flow=[FLOW_ALL])
schd.release_tasks_to_run()
assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3}

do_command(['1/e'], flow=[7])
schd.release_tasks_to_run()
assert schd.pool._get_task_by_id('1/e').flow_nums == {7}

0 comments on commit a246236

Please sign in to comment.