Skip to content

Commit

Permalink
[edge] Clean up of dead tasks in edge_jobs table (apache#44280)
Browse files Browse the repository at this point in the history
* Add edge_job clean up

* Reworked unit test

* Reworked unit test

---------

Co-authored-by: Marco Küttelwesch <[email protected]>
  • Loading branch information
AutomationDev85 and Marco Küttelwesch authored Nov 22, 2024
1 parent 5b2a96e commit d79c6c2
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 13 deletions.
8 changes: 8 additions & 0 deletions providers/src/airflow/providers/edge/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

0.6.1pre0
.........

Misc
~~~~

* ``Update jobs or edge workers who have been killed to clean up job table.``

0.6.0pre0
.........

Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "0.6.0pre0"
__version__ = "0.6.1pre0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.10.0"
Expand Down
41 changes: 36 additions & 5 deletions providers/src/airflow/providers/edge/executors/edge_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.models.abstractoperator import DEFAULT_QUEUE
from airflow.models.taskinstance import TaskInstanceState
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.edge.cli.edge_command import EDGE_COMMANDS
from airflow.providers.edge.models.edge_job import EdgeJobModel
from airflow.providers.edge.models.edge_logs import EdgeLogsModel
Expand All @@ -42,7 +42,6 @@
from sqlalchemy.orm import Session

from airflow.executors.base_executor import CommandType
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey

PARALLELISM: int = conf.getint("core", "PARALLELISM")
Expand Down Expand Up @@ -108,6 +107,30 @@ def _check_worker_liveness(self, session: Session) -> bool:

return changed

def _update_orphaned_jobs(self, session: Session) -> bool:
"""Update status ob jobs when workers die and don't update anymore."""
heartbeat_interval: int = conf.getint("scheduler", "scheduler_zombie_task_threshold")
lifeless_jobs: list[EdgeJobModel] = (
session.query(EdgeJobModel)
.filter(
EdgeJobModel.state == TaskInstanceState.RUNNING,
EdgeJobModel.last_update < (timezone.utcnow() - timedelta(seconds=heartbeat_interval)),
)
.all()
)

for job in lifeless_jobs:
ti = TaskInstance.get_task_instance(
dag_id=job.dag_id,
run_id=job.run_id,
task_id=job.task_id,
map_index=job.map_index,
session=session,
)
job.state = ti.state if ti else TaskInstanceState.REMOVED

return bool(lifeless_jobs)

def _purge_jobs(self, session: Session) -> bool:
"""Clean finished jobs."""
purged_marker = False
Expand All @@ -117,7 +140,12 @@ def _purge_jobs(self, session: Session) -> bool:
session.query(EdgeJobModel)
.filter(
EdgeJobModel.state.in_(
[TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS, TaskInstanceState.FAILED]
[
TaskInstanceState.RUNNING,
TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED,
TaskInstanceState.REMOVED,
]
)
)
.all()
Expand Down Expand Up @@ -145,7 +173,7 @@ def _purge_jobs(self, session: Session) -> bool:
job.state == TaskInstanceState.SUCCESS
and job.last_update_t < (datetime.now() - timedelta(minutes=job_success_purge)).timestamp()
) or (
job.state == TaskInstanceState.FAILED
job.state in (TaskInstanceState.FAILED, TaskInstanceState.REMOVED)
and job.last_update_t < (datetime.now() - timedelta(minutes=job_fail_purge)).timestamp()
):
if job.key in self.last_reported_state:
Expand All @@ -168,7 +196,10 @@ def _purge_jobs(self, session: Session) -> bool:
def sync(self, session: Session = NEW_SESSION) -> None:
"""Sync will get called periodically by the heartbeat method."""
with Stats.timer("edge_executor.sync.duration"):
if self._purge_jobs(session) or self._check_worker_liveness(session):
orphaned = self._update_orphaned_jobs(session)
purged = self._purge_jobs(session)
liveness = self._check_worker_liveness(session)
if purged or liveness or orphaned:
session.commit()

def end(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ source-date-epoch: 1729683247

# note that those versions are maintained by release manager - do not update them manually
versions:
- 0.6.0pre0
- 0.6.1pre0

dependencies:
- apache-airflow>=2.10.0
Expand Down
59 changes: 53 additions & 6 deletions providers/tests/edge/executors/test_edge_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
# under the License.
from __future__ import annotations

from datetime import datetime
from datetime import datetime, timedelta
from unittest.mock import patch

import pytest

from airflow.configuration import conf
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.edge.executors.edge_executor import EdgeExecutor
from airflow.providers.edge.models.edge_job import EdgeJobModel
Expand Down Expand Up @@ -65,6 +66,44 @@ def test_execute_async_ok_command(self):
assert jobs[0].run_id == "test_run"
assert jobs[0].task_id == "test_task"

def test_sync_orphaned_tasks(self):
executor = EdgeExecutor()

delta_to_purge = timedelta(minutes=conf.getint("edge", "job_fail_purge") + 1)
delta_to_orphaned = timedelta(seconds=conf.getint("scheduler", "scheduler_zombie_task_threshold") + 1)

with create_session() as session:
for task_id, state, last_update in [
(
"started_running_orphaned",
TaskInstanceState.RUNNING,
timezone.utcnow() - delta_to_orphaned,
),
("started_removed", TaskInstanceState.REMOVED, timezone.utcnow() - delta_to_purge),
]:
session.add(
EdgeJobModel(
dag_id="test_dag",
task_id=task_id,
run_id="test_run",
map_index=-1,
try_number=1,
state=state,
queue="default",
command="dummy",
last_update=last_update,
)
)
session.commit()

executor.sync()

with create_session() as session:
jobs = session.query(EdgeJobModel).all()
assert len(jobs) == 1
assert jobs[0].task_id == "started_running_orphaned"
assert jobs[0].task_id == "started_running_orphaned"

@patch("airflow.providers.edge.executors.edge_executor.EdgeExecutor.running_state")
@patch("airflow.providers.edge.executors.edge_executor.EdgeExecutor.success")
@patch("airflow.providers.edge.executors.edge_executor.EdgeExecutor.fail")
Expand All @@ -77,12 +116,14 @@ def remove_from_running(key: TaskInstanceKey):
mock_success.side_effect = remove_from_running
mock_fail.side_effect = remove_from_running

delta_to_purge = timedelta(minutes=conf.getint("edge", "job_fail_purge") + 1)

# Prepare some data
with create_session() as session:
for task_id, state in [
("started_running", TaskInstanceState.RUNNING),
("started_success", TaskInstanceState.SUCCESS),
("started_failed", TaskInstanceState.FAILED),
for task_id, state, last_update in [
("started_running", TaskInstanceState.RUNNING, timezone.utcnow()),
("started_success", TaskInstanceState.SUCCESS, timezone.utcnow() - delta_to_purge),
("started_failed", TaskInstanceState.FAILED, timezone.utcnow() - delta_to_purge),
]:
session.add(
EdgeJobModel(
Expand All @@ -94,7 +135,7 @@ def remove_from_running(key: TaskInstanceKey):
state=state,
queue="default",
command="dummy",
last_update=timezone.utcnow(),
last_update=last_update,
)
)
key = TaskInstanceKey(
Expand All @@ -106,6 +147,12 @@ def remove_from_running(key: TaskInstanceKey):

executor.sync()

with create_session() as session:
jobs = session.query(EdgeJobModel).all()
assert len(session.query(EdgeJobModel).all()) == 1
assert jobs[0].task_id == "started_running"
assert jobs[0].state == TaskInstanceState.RUNNING

assert len(executor.running) == 1
mock_running_state.assert_called_once()
mock_success.assert_called_once()
Expand Down

0 comments on commit d79c6c2

Please sign in to comment.