From f9c30a0728865dea682e7fa8502809a7475a2845 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:22:22 +0000 Subject: [PATCH] Revert switch from Tokens to string IDs --- cylc/flow/scheduler.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 2a66ade7dc..a2d1036d1d 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -92,7 +92,7 @@ get_user, is_remote_platform, ) -from cylc.flow.id import Tokens, quick_relative_id +from cylc.flow.id import Tokens from cylc.flow.log_level import ( verbosity_to_env, verbosity_to_opts, @@ -1099,9 +1099,9 @@ def remove_tasks( if flow_nums is None: flow_nums = set() - # Mapping of relative task IDs to removed flow numbers: - removed: Dict[str, FlowNums] = {} - not_removed: Set[str] = set() + # Mapping of *relative* task IDs to removed flow numbers: + removed: Dict[Tokens, FlowNums] = {} + not_removed: Set[Tokens] = set() # All the matched tasks (will add applicable active tasks below): matched_tasks = inactive.copy() to_kill: List[TaskProxy] = [] @@ -1109,9 +1109,9 @@ def remove_tasks( for itask in active: fnums_to_remove = itask.match_flows(flow_nums) if not fnums_to_remove: - not_removed.add(itask.identity) + not_removed.add(itask.tokens.task) continue - removed[itask.identity] = fnums_to_remove + removed[itask.tokens.task] = fnums_to_remove matched_tasks.add((itask.tdef, itask.point)) if fnums_to_remove == itask.flow_nums: # Need to remove the task from the pool. @@ -1124,7 +1124,7 @@ def remove_tasks( itask.flow_nums.difference_update(fnums_to_remove) for tdef, point in matched_tasks: - task_id = quick_relative_id(point, tdef.name) + tokens = Tokens(cycle=str(point), task=tdef.name) # Go through any tasks downstream of this matched task to see if # any need to stand down as a result of this task being removed: @@ -1144,9 +1144,9 @@ def remove_tasks( ): # Unset any prereqs naturally satisfied by these tasks # (do not unset those satisfied by `cylc set --pre`): - if prereq.unset_naturally_satisfied(task_id): + if prereq.unset_naturally_satisfied(tokens.relative_id): prereqs_changed = True - removed.setdefault(task_id, set()).update( + removed.setdefault(tokens, set()).update( fnums_to_remove ) if not prereqs_changed: @@ -1182,10 +1182,10 @@ def remove_tasks( str(point), tdef.name, flow_nums, ) if db_removed_fnums: - removed.setdefault(task_id, set()).update(db_removed_fnums) + removed.setdefault(tokens, set()).update(db_removed_fnums) - if task_id not in removed: - not_removed.add(task_id) + if tokens not in removed: + not_removed.add(tokens) if to_kill: self.kill_tasks(to_kill, warn=False) @@ -1193,9 +1193,11 @@ def remove_tasks( if removed: tasks_str_list = [] for task, fnums in removed.items(): - self.data_store_mgr.delta_remove_task_flow_nums(task, fnums) + self.data_store_mgr.delta_remove_task_flow_nums( + task.relative_id, fnums + ) tasks_str_list.append( - f"{task} {repr_flow_nums(fnums, full=True)}" + f"{task.relative_id} {repr_flow_nums(fnums, full=True)}" ) LOG.info(f"Removed task(s): {', '.join(sorted(tasks_str_list))}") @@ -1203,7 +1205,9 @@ def remove_tasks( fnums_str = ( repr_flow_nums(flow_nums, full=True) if flow_nums else '' ) - tasks_str = ', '.join(sorted(not_removed)) + tasks_str = ', '.join( + sorted(tokens.relative_id for tokens in not_removed) + ) LOG.warning(f"Task(s) not removable: {tasks_str} {fnums_str}") if removed and self.pool.compute_runahead():