From d32832c671884554a1bf2adafba4de5d45ca9487 Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Wed, 30 Aug 2023 20:01:41 +0530 Subject: [PATCH] Remove SHUTDOWN status (#2027) # Description Remove the task instance state which no longer exists. --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Ephraim Anierobi --- python-sdk/src/astro/sql/operators/cleanup.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/python-sdk/src/astro/sql/operators/cleanup.py b/python-sdk/src/astro/sql/operators/cleanup.py index f5c5951e7..e1acdbbbb 100644 --- a/python-sdk/src/astro/sql/operators/cleanup.py +++ b/python-sdk/src/astro/sql/operators/cleanup.py @@ -19,8 +19,7 @@ MappedOperator = None -from airflow.models.taskinstance import TaskInstance -from airflow.utils.state import State +from airflow.models.taskinstance import TaskInstance, TaskInstanceState from astro.databases import create_database from astro.sql.operators.base_decorator import BaseSQLDecoratedOperator @@ -134,7 +133,7 @@ def _has_task_failed(self, task_instances: list[TaskInstance]) -> bool: failed_tasks = [ (ti.task_id, ti.state) for ti in task_instances - if ti.task_id != self.task_id and ti.state == State.FAILED + if ti.task_id != self.task_id and ti.state == TaskInstanceState.FAILED ] if failed_tasks: self.log.info( @@ -159,12 +158,11 @@ def _is_dag_running(self, task_instances: list[TaskInstance]) -> bool: if ti.task_id != self.task_id and ti.state not in [ - State.SUCCESS, - State.FAILED, - State.SKIPPED, - State.UPSTREAM_FAILED, - State.REMOVED, - State.SHUTDOWN, + TaskInstanceState.SUCCESS, + TaskInstanceState.FAILED, + TaskInstanceState.SKIPPED, + TaskInstanceState.UPSTREAM_FAILED, + TaskInstanceState.REMOVED, ] ] if running_tasks: