From a6f612d89942f141eb8a7affbbea46d033923d1a Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Fri, 19 Apr 2024 12:51:21 +0400 Subject: [PATCH] Rename model `ImportError` to `ParseImportError` for avoid shadowing with builtin exception (#39116) --- airflow/api/common/delete_dag.py | 5 +-- .../endpoints/import_error_endpoint.py | 12 +++---- airflow/api_connexion/schemas/error_schema.py | 6 ++-- airflow/dag_processing/manager.py | 8 ++--- airflow/dag_processing/processor.py | 13 ++++---- airflow/models/__init__.py | 30 ++++++++++++----- airflow/models/errors.py | 19 ++++++++++- airflow/www/utils.py | 4 +-- airflow/www/views.py | 7 ++-- pyproject.toml | 2 ++ .../endpoints/test_import_error_endpoint.py | 30 ++++++++--------- .../schemas/test_error_schema.py | 8 ++--- .../common/test_delete_dag.py | 2 +- tests/dag_processing/test_job_runner.py | 11 ++++--- tests/dag_processing/test_processor.py | 33 ++++++++++--------- tests/test_utils/db.py | 4 +-- 16 files changed, 116 insertions(+), 78 deletions(-) diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py index 4452e2726f934..1cf7ffec8b9e4 100644 --- a/airflow/api/common/delete_dag.py +++ b/airflow/api/common/delete_dag.py @@ -27,6 +27,7 @@ from airflow import models from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DagModel, TaskFail +from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.utils.db import get_sqla_model_classes from airflow.utils.session import NEW_SESSION, provide_session @@ -99,8 +100,8 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session = # Delete entries in Import Errors table for a deleted DAG # This handles the case when the dag_id is changed in the file session.execute( - delete(models.ImportError) - .where(models.ImportError.filename == dag.fileloc) + delete(ParseImportError) + .where(ParseImportError.filename == dag.fileloc) .execution_options(synchronize_session="fetch") ) diff --git a/airflow/api_connexion/endpoints/import_error_endpoint.py b/airflow/api_connexion/endpoints/import_error_endpoint.py index 274d842d1818e..76b706eac1ae4 100644 --- a/airflow/api_connexion/endpoints/import_error_endpoint.py +++ b/airflow/api_connexion/endpoints/import_error_endpoint.py @@ -30,7 +30,7 @@ ) from airflow.auth.managers.models.resource_details import AccessView, DagDetails from airflow.models.dag import DagModel -from airflow.models.errors import ImportError as ImportErrorModel +from airflow.models.errors import ParseImportError from airflow.utils.session import NEW_SESSION, provide_session from airflow.www.extensions.init_auth_manager import get_auth_manager @@ -45,7 +45,7 @@ @provide_session def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) -> APIResponse: """Get an import error.""" - error = session.get(ImportErrorModel, import_error_id) + error = session.get(ParseImportError, import_error_id) if error is None: raise NotFound( "Import error not found", @@ -85,8 +85,8 @@ def get_import_errors( """Get all import errors.""" to_replace = {"import_error_id": "id"} allowed_sort_attrs = ["import_error_id", "timestamp", "filename"] - count_query = select(func.count(ImportErrorModel.id)) - query = select(ImportErrorModel) + count_query = select(func.count(ParseImportError.id)) + query = select(ParseImportError) query = apply_sorting(query, order_by, to_replace, allowed_sort_attrs) can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") @@ -95,8 +95,8 @@ def get_import_errors( # if the user doesn't have access to all DAGs, only display errors from visible DAGs readable_dag_ids = security.get_readable_dags() dagfiles_stmt = select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids)) - query = query.where(ImportErrorModel.filename.in_(dagfiles_stmt)) - count_query = count_query.where(ImportErrorModel.filename.in_(dagfiles_stmt)) + query = query.where(ParseImportError.filename.in_(dagfiles_stmt)) + count_query = count_query.where(ParseImportError.filename.in_(dagfiles_stmt)) total_entries = session.scalars(count_query).one() import_errors = session.scalars(query.offset(offset).limit(limit)).all() diff --git a/airflow/api_connexion/schemas/error_schema.py b/airflow/api_connexion/schemas/error_schema.py index 97e12c584eb0b..8f117fb9666b2 100644 --- a/airflow/api_connexion/schemas/error_schema.py +++ b/airflow/api_connexion/schemas/error_schema.py @@ -21,7 +21,7 @@ from marshmallow import Schema, fields from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field -from airflow.models.errors import ImportError +from airflow.models.errors import ParseImportError class ImportErrorSchema(SQLAlchemySchema): @@ -30,7 +30,7 @@ class ImportErrorSchema(SQLAlchemySchema): class Meta: """Meta.""" - model = ImportError + model = ParseImportError import_error_id = auto_field("id", dump_only=True) timestamp = auto_field(format="iso", dump_only=True) @@ -41,7 +41,7 @@ class Meta: class ImportErrorCollection(NamedTuple): """List of import errors with metadata.""" - import_errors: list[ImportError] + import_errors: list[ParseImportError] total_entries: int diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index ef9ac5c44ac38..b2931feb5d756 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -45,10 +45,10 @@ from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest from airflow.configuration import conf from airflow.dag_processing.processor import DagFileProcessorProcess -from airflow.models import errors from airflow.models.dag import DagModel from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest +from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.secrets.cache import SecretCache from airflow.stats import Stats @@ -803,12 +803,12 @@ def clear_nonexistent_import_errors( :param file_paths: list of paths to DAG definition files :param session: session for ORM operations """ - query = delete(errors.ImportError) + query = delete(ParseImportError) if file_paths: query = query.where( - ~errors.ImportError.filename.in_(file_paths), - errors.ImportError.processor_subdir == processor_subdir, + ~ParseImportError.filename.in_(file_paths), + ParseImportError.processor_subdir == processor_subdir, ) session.execute(query.execution_options(synchronize_session="fetch")) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 99c965682644b..f813a1beb2108 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -39,11 +39,12 @@ ) from airflow.configuration import conf from airflow.exceptions import AirflowException, TaskNotFound -from airflow.models import SlaMiss, errors +from airflow.models import SlaMiss from airflow.models.dag import DAG, DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun as DR from airflow.models.dagwarning import DagWarning, DagWarningType +from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import TaskInstance, TaskInstance as TI from airflow.stats import Stats @@ -613,24 +614,24 @@ def update_import_errors( # that no longer have errors for dagbag_file in files_without_error: session.execute( - delete(errors.ImportError) - .where(errors.ImportError.filename.startswith(dagbag_file)) + delete(ParseImportError) + .where(ParseImportError.filename.startswith(dagbag_file)) .execution_options(synchronize_session="fetch") ) # files that still have errors - existing_import_error_files = [x.filename for x in session.query(errors.ImportError.filename).all()] + existing_import_error_files = [x.filename for x in session.query(ParseImportError.filename).all()] # Add the errors of the processed files for filename, stacktrace in import_errors.items(): if filename in existing_import_error_files: - session.query(errors.ImportError).filter(errors.ImportError.filename == filename).update( + session.query(ParseImportError).filter(ParseImportError.filename == filename).update( {"filename": filename, "timestamp": timezone.utcnow(), "stacktrace": stacktrace}, synchronize_session="fetch", ) else: session.add( - errors.ImportError( + ParseImportError( filename=filename, timestamp=timezone.utcnow(), stacktrace=stacktrace, diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index c5462c57f8b34..68bc4d8c14df0 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -34,7 +34,6 @@ "DagRun", "DagTag", "DbCallbackRequest", - "ImportError", "Log", "MappedOperator", "Operator", @@ -62,19 +61,36 @@ def import_all_models(): import airflow.models.dagwarning import airflow.models.dataset + import airflow.models.errors import airflow.models.serialized_dag import airflow.models.tasklog def __getattr__(name): # PEP-562: Lazy loaded attributes on python modules - path = __lazy_imports.get(name) - if not path: - raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + if name != "ImportError": + path = __lazy_imports.get(name) + if not path: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") - from airflow.utils.module_loading import import_string + from airflow.utils.module_loading import import_string + + val = import_string(f"{path}.{name}") + else: + import warnings + + from airflow.exceptions import RemovedInAirflow3Warning + from airflow.models.errors import ParseImportError + + warnings.warn( + f"Import '{__name__}.ImportError' is deprecated due to shadowing with builtin exception " + f"ImportError and will be removed in the future. " + f"Please consider to use '{ParseImportError.__module__}.ParseImportError' instead.", + RemovedInAirflow3Warning, + stacklevel=2, + ) + val = ParseImportError - val = import_string(f"{path}.{name}") # Store for next time globals()[name] = val return val @@ -94,7 +110,6 @@ def __getattr__(name): "DagTag": "airflow.models.dag", "DagWarning": "airflow.models.dagwarning", "DbCallbackRequest": "airflow.models.db_callback_request", - "ImportError": "airflow.models.errors", "Log": "airflow.models.log", "MappedOperator": "airflow.models.mappedoperator", "Operator": "airflow.models.operator", @@ -125,7 +140,6 @@ def __getattr__(name): from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest - from airflow.models.errors import ImportError from airflow.models.log import Log from airflow.models.mappedoperator import MappedOperator from airflow.models.operator import Operator diff --git a/airflow/models/errors.py b/airflow/models/errors.py index ed76c6c35501b..f891b03d67a15 100644 --- a/airflow/models/errors.py +++ b/airflow/models/errors.py @@ -17,13 +17,16 @@ # under the License. from __future__ import annotations +import warnings + from sqlalchemy import Column, Integer, String, Text +from airflow.exceptions import RemovedInAirflow3Warning from airflow.models.base import Base from airflow.utils.sqlalchemy import UtcDateTime -class ImportError(Base): +class ParseImportError(Base): """Stores all Import Errors which are recorded when parsing DAGs and displayed on the Webserver.""" __tablename__ = "import_error" @@ -32,3 +35,17 @@ class ImportError(Base): filename = Column(String(1024)) stacktrace = Column(Text) processor_subdir = Column(String(2000), nullable=True) + + +def __getattr__(name: str): + # PEP-562: Lazy loaded attributes on python modules + if name == "ImportError": + warnings.warn( + f"Model class '{__name__}.ImportError' is deprecated due to shadowing with builtin exception " + f"ImportError and will be removed in the future. " + f"Please consider to use '{__name__}.ParseImportError' instead.", + RemovedInAirflow3Warning, + stacklevel=2, + ) + return ParseImportError + raise AttributeError(f"module {__name__} has no attribute {name}") diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 995833d6d651d..513b453006fd1 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -40,9 +40,9 @@ from airflow.configuration import conf from airflow.exceptions import RemovedInAirflow3Warning -from airflow.models import errors from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning +from airflow.models.errors import ParseImportError from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone from airflow.utils.code_utils import get_python_source @@ -196,7 +196,7 @@ def encode_dag_run( def check_import_errors(fileloc, session): # Check dag import errors import_errors = session.scalars( - select(errors.ImportError).where(errors.ImportError.filename == fileloc) + select(ParseImportError).where(ParseImportError.filename == fileloc) ).all() if import_errors: for import_error in import_errors: diff --git a/airflow/www/views.py b/airflow/www/views.py index 5bdebe5d91632..328312658bd18 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -101,10 +101,11 @@ from airflow.jobs.job import Job from airflow.jobs.scheduler_job_runner import SchedulerJobRunner from airflow.jobs.triggerer_job_runner import TriggererJobRunner -from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, Trigger, XCom, errors +from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, Trigger, XCom from airflow.models.dag import get_dataset_triggered_next_run_info from airflow.models.dagrun import RUN_ID_REGEX, DagRun, DagRunType from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent, DatasetModel +from airflow.models.errors import ParseImportError from airflow.models.operator import needs_expansion from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import TaskInstance, TaskInstanceNote @@ -945,13 +946,13 @@ def index(self): owner_links_dict = DagOwnerAttributes.get_all(session) if get_auth_manager().is_authorized_view(access_view=AccessView.IMPORT_ERRORS): - import_errors = select(errors.ImportError).order_by(errors.ImportError.id) + import_errors = select(ParseImportError).order_by(ParseImportError.id) can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") if not can_read_all_dags: # if the user doesn't have access to all DAGs, only display errors from visible DAGs import_errors = import_errors.where( - errors.ImportError.filename.in_( + ParseImportError.filename.in_( select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(filter_dag_ids)) ) ) diff --git a/pyproject.toml b/pyproject.toml index bb4cf71d801f2..05971e7c4a6f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -434,6 +434,8 @@ banned-module-level-imports = ["numpy", "pandas"] "airflow.PY312".msg = "Use sys.version_info >= (3, 12) instead." # Deprecated imports "airflow.models.baseoperator.BaseOperatorLink".msg = "Use airflow.models.baseoperatorlink.BaseOperatorLink" +"airflow.models.errors.ImportError".msg = "Use airflow.models.errors.ParseImportError" +"airflow.models.ImportError".msg = "Use airflow.models.errors.ParseImportError" # Deprecated in Python 3.11, Pending Removal in Python 3.15: https://github.com/python/cpython/issues/90817 # Deprecation warning in Python 3.11 also recommends using locale.getencoding but it available in Python 3.11 "locale.getdefaultlocale".msg = "Use locale.setlocale() and locale.getlocale() instead." diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py b/tests/api_connexion/endpoints/test_import_error_endpoint.py index fae1312a32058..ce084165d24e2 100644 --- a/tests/api_connexion/endpoints/test_import_error_endpoint.py +++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py @@ -22,7 +22,7 @@ from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP from airflow.models.dag import DagModel -from airflow.models.errors import ImportError +from airflow.models.errors import ParseImportError from airflow.security import permissions from airflow.utils import timezone from airflow.utils.session import provide_session @@ -95,7 +95,7 @@ def _normalize_import_errors(import_errors): class TestGetImportErrorEndpoint(TestBaseImportError): def test_response_200(self, session): - import_error = ImportError( + import_error = ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -128,7 +128,7 @@ def test_response_404(self): } == response.json def test_should_raises_401_unauthenticated(self, session): - import_error = ImportError( + import_error = ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -147,7 +147,7 @@ def test_should_raise_403_forbidden(self): assert response.status_code == 403 def test_should_raise_403_forbidden_without_dag_read(self, session): - import_error = ImportError( + import_error = ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -164,7 +164,7 @@ def test_should_raise_403_forbidden_without_dag_read(self, session): def test_should_return_200_with_single_dag_read(self, session): dag_model = DagModel(dag_id=TEST_DAG_IDS[0], fileloc="Lorem_ipsum.py") session.add(dag_model) - import_error = ImportError( + import_error = ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -190,7 +190,7 @@ def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, sessio for dag_id in TEST_DAG_IDS: dag_model = DagModel(dag_id=dag_id, fileloc="Lorem_ipsum.py") session.add(dag_model) - import_error = ImportError( + import_error = ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -216,7 +216,7 @@ def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, sessio class TestGetImportErrorsEndpoint(TestBaseImportError): def test_get_import_errors(self, session): import_error = [ - ImportError( + ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -251,7 +251,7 @@ def test_get_import_errors(self, session): def test_get_import_errors_order_by(self, session): import_error = [ - ImportError( + ParseImportError( filename=f"Lorem_ipsum{i}.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC") + timedelta(days=-i), @@ -288,7 +288,7 @@ def test_get_import_errors_order_by(self, session): def test_order_by_raises_400_for_invalid_attr(self, session): import_error = [ - ImportError( + ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -308,7 +308,7 @@ def test_order_by_raises_400_for_invalid_attr(self, session): def test_should_raises_401_unauthenticated(self, session): import_error = [ - ImportError( + ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -327,7 +327,7 @@ def test_get_import_errors_single_dag(self, session): fake_filename = f"/tmp/{dag_id}.py" dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) session.add(dag_model) - importerror = ImportError( + importerror = ParseImportError( filename=fake_filename, stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -360,7 +360,7 @@ def test_get_import_errors_single_dag_in_dagfile(self, session): dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) session.add(dag_model) - importerror = ImportError( + importerror = ParseImportError( filename="/tmp/all_in_one.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -405,7 +405,7 @@ class TestGetImportErrorsEndpointPagination(TestBaseImportError): @provide_session def test_limit_and_offset(self, url, expected_import_error_ids, session): import_errors = [ - ImportError( + ParseImportError( filename=f"/tmp/file_{i}.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -423,7 +423,7 @@ def test_limit_and_offset(self, url, expected_import_error_ids, session): def test_should_respect_page_size_limit_default(self, session): import_errors = [ - ImportError( + ParseImportError( filename=f"/tmp/file_{i}.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -439,7 +439,7 @@ def test_should_respect_page_size_limit_default(self, session): @conf_vars({("api", "maximum_page_limit"): "150"}) def test_should_return_conf_max_if_req_max_above_conf(self, session): import_errors = [ - ImportError( + ParseImportError( filename=f"/tmp/file_{i}.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), diff --git a/tests/api_connexion/schemas/test_error_schema.py b/tests/api_connexion/schemas/test_error_schema.py index 5cb6873a9eaa6..7056417e66098 100644 --- a/tests/api_connexion/schemas/test_error_schema.py +++ b/tests/api_connexion/schemas/test_error_schema.py @@ -23,7 +23,7 @@ import_error_collection_schema, import_error_schema, ) -from airflow.models.errors import ImportError +from airflow.models.errors import ParseImportError from airflow.utils import timezone from airflow.utils.session import provide_session from tests.test_utils.db import clear_db_import_errors @@ -43,7 +43,7 @@ def teardown_method(self) -> None: class TestErrorSchema(TestErrorSchemaBase): @provide_session def test_serialize(self, session): - import_error = ImportError( + import_error = ParseImportError( filename="lorem.py", stacktrace="Lorem Ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -64,7 +64,7 @@ class TestErrorCollectionSchema(TestErrorSchemaBase): @provide_session def test_serialize(self, session): import_error = [ - ImportError( + ParseImportError( filename="Lorem_ipsum.py", stacktrace="Lorem ipsum", timestamp=timezone.parse(self.timestamp, timezone="UTC"), @@ -73,7 +73,7 @@ def test_serialize(self, session): ] session.add_all(import_error) session.commit() - query = session.query(ImportError) + query = session.query(ParseImportError) query_list = query.all() serialized_data = import_error_collection_schema.dump( ImportErrorCollection(import_errors=query_list, total_entries=2) diff --git a/tests/api_experimental/common/test_delete_dag.py b/tests/api_experimental/common/test_delete_dag.py index a97e6e2bca819..9fa98a4ffa6ae 100644 --- a/tests/api_experimental/common/test_delete_dag.py +++ b/tests/api_experimental/common/test_delete_dag.py @@ -23,7 +23,7 @@ from airflow.exceptions import AirflowException, DagNotFound from airflow.models.dag import DAG, DagModel from airflow.models.dagrun import DagRun as DR -from airflow.models.errors import ImportError as IE +from airflow.models.errors import ParseImportError as IE from airflow.models.log import Log from airflow.models.taskfail import TaskFail from airflow.models.taskinstance import TaskInstance as TI diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py index 683994f4107c4..e35e2fb97f90c 100644 --- a/tests/dag_processing/test_job_runner.py +++ b/tests/dag_processing/test_job_runner.py @@ -52,8 +52,9 @@ from airflow.dag_processing.processor import DagFileProcessorProcess from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner from airflow.jobs.job import Job -from airflow.models import DagBag, DagModel, DbCallbackRequest, errors +from airflow.models import DagBag, DagModel, DbCallbackRequest from airflow.models.dagcode import DagCode +from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import timezone from airflow.utils.net import get_hostname @@ -173,14 +174,14 @@ def test_remove_file_clears_import_error(self, tmp_path): with create_session() as session: self.run_processor_manager_one_loop(manager, parent_pipe) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 1 path_to_parse.unlink() # Rerun the scheduler once the dag file has been removed self.run_processor_manager_one_loop(manager, parent_pipe) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 0 session.rollback() @@ -847,7 +848,7 @@ def test_import_error_with_dag_directory(self, tmp_path): self.run_processor_manager_one_loop(manager, parent_pipe) - import_errors = session.query(errors.ImportError).order_by("id").all() + import_errors = session.query(ParseImportError).order_by("id").all() assert len(import_errors) == 1 assert import_errors[0].processor_subdir == str(processor_dir_1) @@ -868,7 +869,7 @@ def test_import_error_with_dag_directory(self, tmp_path): self.run_processor_manager_one_loop(manager, parent_pipe) - import_errors = session.query(errors.ImportError).order_by("id").all() + import_errors = session.query(ParseImportError).order_by("id").all() assert len(import_errors) == 2 assert import_errors[0].processor_subdir == str(processor_dir_1) assert import_errors[1].processor_subdir == str(processor_dir_2) diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index a2b1eb16049f9..09b639806b161 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -31,7 +31,8 @@ from airflow.configuration import TEST_DAGS_FOLDER, conf from airflow.dag_processing.manager import DagFileProcessorAgent from airflow.dag_processing.processor import DagFileProcessor, DagFileProcessorProcess -from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance, errors +from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance +from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import SimpleTaskInstance from airflow.operators.empty import EmptyOperator @@ -604,7 +605,7 @@ def test_add_unparseable_file_before_sched_start_creates_import_error(self, tmp_ with create_session() as session: self._process_file(unparseable_filename, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 1 import_error = import_errors[0] @@ -621,7 +622,7 @@ def test_add_unparseable_zip_file_creates_import_error(self, tmp_path): with create_session() as session: self._process_file(zip_filename, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 1 import_error = import_errors[0] @@ -646,7 +647,7 @@ def test_dag_model_has_import_error_is_true_when_import_error_exists(self, tmp_p file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) self._process_file(temp_dagfile, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 1 import_error = import_errors[0] @@ -661,7 +662,7 @@ def test_no_import_errors_with_parseable_dag(self, tmp_path): with create_session() as session: self._process_file(parseable_filename.as_posix(), dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 0 @@ -674,7 +675,7 @@ def test_no_import_errors_with_parseable_dag_in_zip(self, tmp_path): with create_session() as session: self._process_file(zip_filename, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 0 @@ -695,7 +696,7 @@ def test_new_import_error_replaces_old(self, tmp_path): ) self._process_file(unparseable_filename.as_posix(), dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 1 import_error = import_errors[0] @@ -717,7 +718,7 @@ def test_import_error_record_is_updated_not_deleted_and_recreated(self, tmp_path self._process_file(filename_to_parse, dag_directory=tmp_path, session=session) import_error_1 = ( - session.query(errors.ImportError).filter(errors.ImportError.filename == filename_to_parse).one() + session.query(ParseImportError).filter(ParseImportError.filename == filename_to_parse).one() ) # process the file multiple times @@ -725,7 +726,7 @@ def test_import_error_record_is_updated_not_deleted_and_recreated(self, tmp_path self._process_file(filename_to_parse, dag_directory=tmp_path, session=session) import_error_2 = ( - session.query(errors.ImportError).filter(errors.ImportError.filename == filename_to_parse).one() + session.query(ParseImportError).filter(ParseImportError.filename == filename_to_parse).one() ) # assert that the ID of the import error did not change @@ -745,7 +746,7 @@ def test_remove_error_clears_import_error(self, tmp_path): file_to_parse.writelines(PARSEABLE_DAG_FILE_CONTENTS) self._process_file(filename_to_parse, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 0 @@ -760,7 +761,7 @@ def test_remove_error_clears_import_error_zip(self, tmp_path): zip_file.writestr(TEMP_DAG_FILENAME, UNPARSEABLE_DAG_FILE_CONTENTS) self._process_file(zip_filename, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 1 # Remove the import error from the file @@ -768,7 +769,7 @@ def test_remove_error_clears_import_error_zip(self, tmp_path): zip_file.writestr(TEMP_DAG_FILENAME, "import os # airflow DAG") self._process_file(zip_filename, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 0 session.rollback() @@ -780,7 +781,7 @@ def test_import_error_tracebacks(self, tmp_path): with create_session() as session: self._process_file(unparseable_filename, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 1 import_error = import_errors[0] @@ -817,7 +818,7 @@ def test_import_error_traceback_depth(self, tmp_path): with create_session() as session: self._process_file(unparseable_filename, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 1 import_error = import_errors[0] @@ -849,7 +850,7 @@ def test_import_error_tracebacks_zip(self, tmp_path): with create_session() as session: self._process_file(invalid_zip_filename, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 1 import_error = import_errors[0] @@ -887,7 +888,7 @@ def test_import_error_tracebacks_zip_depth(self, tmp_path): with create_session() as session: self._process_file(invalid_zip_filename, dag_directory=tmp_path, session=session) - import_errors = session.query(errors.ImportError).all() + import_errors = session.query(ParseImportError).all() assert len(import_errors) == 1 import_error = import_errors[0] diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py index 55583d019412c..900598e20f296 100644 --- a/tests/test_utils/db.py +++ b/tests/test_utils/db.py @@ -34,7 +34,6 @@ Trigger, Variable, XCom, - errors, ) from airflow.models.dag import DagOwnerAttributes from airflow.models.dagcode import DagCode @@ -46,6 +45,7 @@ DatasetModel, TaskOutletDatasetReference, ) +from airflow.models.errors import ParseImportError from airflow.models.serialized_dag import SerializedDagModel from airflow.providers.fab.auth_manager.models import Permission, Resource, assoc_permission_role from airflow.security.permissions import RESOURCE_DAG_PREFIX @@ -136,7 +136,7 @@ def clear_rendered_ti_fields(): def clear_db_import_errors(): with create_session() as session: - session.query(errors.ImportError).delete() + session.query(ParseImportError).delete() def clear_db_dag_warnings():