Skip to content

Commit

Permalink
[core][cgraph] Fix flaky test_dag_exception_chained
Browse files Browse the repository at this point in the history
Signed-off-by: Rui Qiao <[email protected]>
  • Loading branch information
ruisearch42 committed Feb 5, 2025
1 parent e3680f7 commit 30aec6b
Showing 1 changed file with 33 additions and 28 deletions.
61 changes: 33 additions & 28 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2090,34 +2090,39 @@ def teardown(self, kill_actors: bool = False):
return

logger.info("Tearing down compiled DAG")
outer._dag_submitter.close()
outer._dag_output_fetcher.close()

for actor in outer.actor_refs:
logger.info(f"Cancelling compiled worker on actor: {actor}")
# Cancel all actor loops in parallel.
cancel_refs = [
actor.__ray_call__.remote(do_cancel_executable_tasks, tasks)
for actor, tasks in outer.actor_to_executable_tasks.items()
]
for cancel_ref in cancel_refs:
try:
ray.get(cancel_ref, timeout=30)
except RayChannelError:
# Channel error happens when a channel is closed
# or timed out. In this case, do not log.
pass
except Exception:
logger.exception("Error cancelling worker task")
pass

for (
communicator_id
) in outer._actors_to_created_communicator_id.values():
_destroy_communicator(communicator_id)

logger.info("Waiting for worker tasks to exit")
self.wait_teardown(kill_actors=kill_actors)
try:
outer._dag_submitter.close()
outer._dag_output_fetcher.close()

for actor in outer.actor_refs:
logger.info(f"Cancelling compiled worker on actor: {actor}")
# Cancel all actor loops in parallel.
cancel_refs = [
actor.__ray_call__.remote(do_cancel_executable_tasks, tasks)
for actor, tasks in outer.actor_to_executable_tasks.items()
]
for cancel_ref in cancel_refs:
try:
ray.get(cancel_ref, timeout=30)
except RayChannelError:
# Channel error happens when a channel is closed
# or timed out. In this case, do not log.
pass
except Exception:
logger.exception("Error cancelling worker task")
pass

for (
communicator_id
) in outer._actors_to_created_communicator_id.values():
_destroy_communicator(communicator_id)

logger.info("Waiting for worker tasks to exit")
self.wait_teardown(kill_actors=kill_actors)
except ReferenceError:
# Python destruction order is not guaranteed, so we may
# when accessing attributes of `outer` when it is being destroyed.
logger.info("Compiled DAG is already destroyed")
logger.info("Teardown complete")
self._teardown_done = True

Expand Down

0 comments on commit 30aec6b

Please sign in to comment.