Skip to content

Commit

Permalink
Fix ApiException handling when adopting completed pods (apache#41109)
Browse files Browse the repository at this point in the history
When trying to complete adopted pods, if we encounter an ApiException,
we should assume we were unable to adopt the pod meaning we shouldn't
add that to our `running` set. If we do, `running` can fill up over time
with tasks the executor isn't actually watching.
  • Loading branch information
jedcunningham authored Jul 31, 2024
1 parent 16be0e8 commit b014077
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,8 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)
continue

ti_id = annotations_to_key(pod.metadata.annotations)
self.running.add(ti_id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,41 @@ def get_annotations(pod_name):
)
assert executor.running == expected_running_ti_keys

@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_adopt_completed_pods_api_exception(self, mock_kube_client, mock_kube_dynamic_client):
"""We should gracefully handle exceptions when adopting completed pods from other schedulers"""
executor = self.kubernetes_executor
executor.scheduler_job_id = "modified"
executor.kube_client = mock_kube_client
executor.kube_config.kube_namespace = "somens"
pod_names = ["one", "two"]

def get_annotations(pod_name):
return {
"dag_id": "dag",
"run_id": "run_id",
"task_id": pod_name,
"try_number": "1",
}

mock_kube_dynamic_client.return_value.get.return_value.items = [
k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name=pod_name,
labels={"airflow-worker": pod_name},
annotations=get_annotations(pod_name),
namespace="somens",
)
)
for pod_name in pod_names
]

mock_kube_client.patch_namespaced_pod.side_effect = ApiException(status=400)
executor._adopt_completed_pods(mock_kube_client)
assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count
assert executor.running == set()

@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_not_adopt_unassigned_task(self, mock_kube_client):
"""
Expand Down

0 comments on commit b014077

Please sign in to comment.