diff --git a/airflow/providers/celery/executors/celery_kubernetes_executor.py b/airflow/providers/celery/executors/celery_kubernetes_executor.py index e981e75fa15e5..bc2ed7904f5a5 100644 --- a/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -116,6 +116,11 @@ def slots_available(self) -> int: """Number of new tasks this executor instance can accept.""" return self.celery_executor.slots_available + @property + def slots_occupied(self): + """Number of tasks this executor instance is currently managing.""" + return len(self.running) + len(self.queued_tasks) + def queue_command( self, task_instance: TaskInstance, diff --git a/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index 8c948b0d64969..75de1101c59ba 100644 --- a/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -103,6 +103,11 @@ def slots_available(self) -> int: """Number of new tasks this executor instance can accept.""" return self.local_executor.slots_available + @property + def slots_occupied(self): + """Number of tasks this executor instance is currently managing.""" + return len(self.running) + len(self.queued_tasks) + def queue_command( self, task_instance: TaskInstance,