Skip to content

Commit

Permalink
Remove reparse on DAG depending on asset alias (apache#44866)
Browse files Browse the repository at this point in the history
I suspect we no longer need this since alias resolution now happens very
late in the scheduling process. A DAG *should* be able to automatically
understand the alias is resolved without a reparse after refactorings
introduced in cccc933.
  • Loading branch information
uranusjr authored Dec 16, 2024
1 parent 707c564 commit aaf29ee
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 7 deletions.
5 changes: 0 additions & 5 deletions airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ def register_asset_change(
if alias_ref.dag.is_active and not alias_ref.dag.is_paused
}

dags_to_reparse = dags_to_queue_from_asset_alias - dags_to_queue_from_asset
if dags_to_reparse:
file_locs = {dag.fileloc for dag in dags_to_reparse}
cls._send_dag_priority_parsing_request(file_locs, session)

cls.notify_asset_changed(asset=asset)

Stats.incr("asset.updates")
Expand Down
2 changes: 0 additions & 2 deletions tests/assets/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
DagScheduleAssetReference,
)
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagPriorityParsingRequest
from airflow.sdk.definitions.asset import Asset, AssetAlias

from tests.listeners import asset_listener
Expand Down Expand Up @@ -139,7 +138,6 @@ def test_register_asset_change_with_alias(self, session, dag_maker, mock_task_in
# Ensure we've created an asset
assert session.query(AssetEvent).filter_by(asset_id=asm.id).count() == 1
assert session.query(AssetDagRunQueue).count() == 2
assert session.query(DagPriorityParsingRequest).count() == 2

def test_register_asset_change_no_downstreams(self, session, mock_task_instance):
asset_manager = AssetManager()
Expand Down

0 comments on commit aaf29ee

Please sign in to comment.