diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index dcc78e8d8a0e..2f0b7d568166 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -2090,34 +2090,39 @@ def teardown(self, kill_actors: bool = False): return logger.info("Tearing down compiled DAG") - outer._dag_submitter.close() - outer._dag_output_fetcher.close() - - for actor in outer.actor_refs: - logger.info(f"Cancelling compiled worker on actor: {actor}") - # Cancel all actor loops in parallel. - cancel_refs = [ - actor.__ray_call__.remote(do_cancel_executable_tasks, tasks) - for actor, tasks in outer.actor_to_executable_tasks.items() - ] - for cancel_ref in cancel_refs: - try: - ray.get(cancel_ref, timeout=30) - except RayChannelError: - # Channel error happens when a channel is closed - # or timed out. In this case, do not log. - pass - except Exception: - logger.exception("Error cancelling worker task") - pass - - for ( - communicator_id - ) in outer._actors_to_created_communicator_id.values(): - _destroy_communicator(communicator_id) - - logger.info("Waiting for worker tasks to exit") - self.wait_teardown(kill_actors=kill_actors) + try: + outer._dag_submitter.close() + outer._dag_output_fetcher.close() + + for actor in outer.actor_refs: + logger.info(f"Cancelling compiled worker on actor: {actor}") + # Cancel all actor loops in parallel. + cancel_refs = [ + actor.__ray_call__.remote(do_cancel_executable_tasks, tasks) + for actor, tasks in outer.actor_to_executable_tasks.items() + ] + for cancel_ref in cancel_refs: + try: + ray.get(cancel_ref, timeout=30) + except RayChannelError: + # Channel error happens when a channel is closed + # or timed out. In this case, do not log. + pass + except Exception: + logger.exception("Error cancelling worker task") + pass + + for ( + communicator_id + ) in outer._actors_to_created_communicator_id.values(): + _destroy_communicator(communicator_id) + + logger.info("Waiting for worker tasks to exit") + self.wait_teardown(kill_actors=kill_actors) + except ReferenceError: + # Python destruction order is not guaranteed, so we may + # when accessing attributes of `outer` when it is being destroyed. + logger.info("Compiled DAG is already destroyed") logger.info("Teardown complete") self._teardown_done = True