Skip to content

Commit

Permalink
Revert switch from Tokens to string IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Dec 5, 2024
1 parent 35819a5 commit f9c30a0
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
get_user,
is_remote_platform,
)
from cylc.flow.id import Tokens, quick_relative_id
from cylc.flow.id import Tokens
from cylc.flow.log_level import (
verbosity_to_env,
verbosity_to_opts,
Expand Down Expand Up @@ -1099,19 +1099,19 @@ def remove_tasks(

if flow_nums is None:
flow_nums = set()
# Mapping of relative task IDs to removed flow numbers:
removed: Dict[str, FlowNums] = {}
not_removed: Set[str] = set()
# Mapping of *relative* task IDs to removed flow numbers:
removed: Dict[Tokens, FlowNums] = {}
not_removed: Set[Tokens] = set()
# All the matched tasks (will add applicable active tasks below):
matched_tasks = inactive.copy()
to_kill: List[TaskProxy] = []

for itask in active:
fnums_to_remove = itask.match_flows(flow_nums)
if not fnums_to_remove:
not_removed.add(itask.identity)
not_removed.add(itask.tokens.task)
continue
removed[itask.identity] = fnums_to_remove
removed[itask.tokens.task] = fnums_to_remove
matched_tasks.add((itask.tdef, itask.point))
if fnums_to_remove == itask.flow_nums:
# Need to remove the task from the pool.
Expand All @@ -1124,7 +1124,7 @@ def remove_tasks(
itask.flow_nums.difference_update(fnums_to_remove)

for tdef, point in matched_tasks:
task_id = quick_relative_id(point, tdef.name)
tokens = Tokens(cycle=str(point), task=tdef.name)

# Go through any tasks downstream of this matched task to see if
# any need to stand down as a result of this task being removed:
Expand All @@ -1144,9 +1144,9 @@ def remove_tasks(
):
# Unset any prereqs naturally satisfied by these tasks
# (do not unset those satisfied by `cylc set --pre`):
if prereq.unset_naturally_satisfied(task_id):
if prereq.unset_naturally_satisfied(tokens.relative_id):
prereqs_changed = True
removed.setdefault(task_id, set()).update(
removed.setdefault(tokens, set()).update(
fnums_to_remove
)
if not prereqs_changed:
Expand Down Expand Up @@ -1182,28 +1182,32 @@ def remove_tasks(
str(point), tdef.name, flow_nums,
)
if db_removed_fnums:
removed.setdefault(task_id, set()).update(db_removed_fnums)
removed.setdefault(tokens, set()).update(db_removed_fnums)

if task_id not in removed:
not_removed.add(task_id)
if tokens not in removed:
not_removed.add(tokens)

if to_kill:
self.kill_tasks(to_kill, warn=False)

if removed:
tasks_str_list = []
for task, fnums in removed.items():
self.data_store_mgr.delta_remove_task_flow_nums(task, fnums)
self.data_store_mgr.delta_remove_task_flow_nums(
task.relative_id, fnums
)
tasks_str_list.append(
f"{task} {repr_flow_nums(fnums, full=True)}"
f"{task.relative_id} {repr_flow_nums(fnums, full=True)}"
)
LOG.info(f"Removed task(s): {', '.join(sorted(tasks_str_list))}")

if not_removed:
fnums_str = (
repr_flow_nums(flow_nums, full=True) if flow_nums else ''
)
tasks_str = ', '.join(sorted(not_removed))
tasks_str = ', '.join(
sorted(tokens.relative_id for tokens in not_removed)
)
LOG.warning(f"Task(s) not removable: {tasks_str} {fnums_str}")

if removed and self.pool.compute_runahead():
Expand Down

0 comments on commit f9c30a0

Please sign in to comment.