Skip to content

Commit

Permalink
Rename model ImportError to ParseImportError for avoid shadowing …
Browse files Browse the repository at this point in the history
…with builtin exception (apache#39116)
  • Loading branch information
Taragolis authored Apr 19, 2024
1 parent eee17f0 commit a6f612d
Show file tree
Hide file tree
Showing 16 changed files with 116 additions and 78 deletions.
5 changes: 3 additions & 2 deletions airflow/api/common/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from airflow import models
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DagModel, TaskFail
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.db import get_sqla_model_classes
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -99,8 +100,8 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session =
# Delete entries in Import Errors table for a deleted DAG
# This handles the case when the dag_id is changed in the file
session.execute(
delete(models.ImportError)
.where(models.ImportError.filename == dag.fileloc)
delete(ParseImportError)
.where(ParseImportError.filename == dag.fileloc)
.execution_options(synchronize_session="fetch")
)

Expand Down
12 changes: 6 additions & 6 deletions airflow/api_connexion/endpoints/import_error_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
)
from airflow.auth.managers.models.resource_details import AccessView, DagDetails
from airflow.models.dag import DagModel
from airflow.models.errors import ImportError as ImportErrorModel
from airflow.models.errors import ParseImportError
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.www.extensions.init_auth_manager import get_auth_manager

Expand All @@ -45,7 +45,7 @@
@provide_session
def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) -> APIResponse:
"""Get an import error."""
error = session.get(ImportErrorModel, import_error_id)
error = session.get(ParseImportError, import_error_id)
if error is None:
raise NotFound(
"Import error not found",
Expand Down Expand Up @@ -85,8 +85,8 @@ def get_import_errors(
"""Get all import errors."""
to_replace = {"import_error_id": "id"}
allowed_sort_attrs = ["import_error_id", "timestamp", "filename"]
count_query = select(func.count(ImportErrorModel.id))
query = select(ImportErrorModel)
count_query = select(func.count(ParseImportError.id))
query = select(ParseImportError)
query = apply_sorting(query, order_by, to_replace, allowed_sort_attrs)

can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET")
Expand All @@ -95,8 +95,8 @@ def get_import_errors(
# if the user doesn't have access to all DAGs, only display errors from visible DAGs
readable_dag_ids = security.get_readable_dags()
dagfiles_stmt = select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids))
query = query.where(ImportErrorModel.filename.in_(dagfiles_stmt))
count_query = count_query.where(ImportErrorModel.filename.in_(dagfiles_stmt))
query = query.where(ParseImportError.filename.in_(dagfiles_stmt))
count_query = count_query.where(ParseImportError.filename.in_(dagfiles_stmt))

total_entries = session.scalars(count_query).one()
import_errors = session.scalars(query.offset(offset).limit(limit)).all()
Expand Down
6 changes: 3 additions & 3 deletions airflow/api_connexion/schemas/error_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from marshmallow import Schema, fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow.models.errors import ImportError
from airflow.models.errors import ParseImportError


class ImportErrorSchema(SQLAlchemySchema):
Expand All @@ -30,7 +30,7 @@ class ImportErrorSchema(SQLAlchemySchema):
class Meta:
"""Meta."""

model = ImportError
model = ParseImportError

import_error_id = auto_field("id", dump_only=True)
timestamp = auto_field(format="iso", dump_only=True)
Expand All @@ -41,7 +41,7 @@ class Meta:
class ImportErrorCollection(NamedTuple):
"""List of import errors with metadata."""

import_errors: list[ImportError]
import_errors: list[ParseImportError]
total_entries: int


Expand Down
8 changes: 4 additions & 4 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models import errors
from airflow.models.dag import DagModel
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.secrets.cache import SecretCache
from airflow.stats import Stats
Expand Down Expand Up @@ -803,12 +803,12 @@ def clear_nonexistent_import_errors(
:param file_paths: list of paths to DAG definition files
:param session: session for ORM operations
"""
query = delete(errors.ImportError)
query = delete(ParseImportError)

if file_paths:
query = query.where(
~errors.ImportError.filename.in_(file_paths),
errors.ImportError.processor_subdir == processor_subdir,
~ParseImportError.filename.in_(file_paths),
ParseImportError.processor_subdir == processor_subdir,
)

session.execute(query.execution_options(synchronize_session="fetch"))
Expand Down
13 changes: 7 additions & 6 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@
)
from airflow.configuration import conf
from airflow.exceptions import AirflowException, TaskNotFound
from airflow.models import SlaMiss, errors
from airflow.models import SlaMiss
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstance as TI
from airflow.stats import Stats
Expand Down Expand Up @@ -613,24 +614,24 @@ def update_import_errors(
# that no longer have errors
for dagbag_file in files_without_error:
session.execute(
delete(errors.ImportError)
.where(errors.ImportError.filename.startswith(dagbag_file))
delete(ParseImportError)
.where(ParseImportError.filename.startswith(dagbag_file))
.execution_options(synchronize_session="fetch")
)

# files that still have errors
existing_import_error_files = [x.filename for x in session.query(errors.ImportError.filename).all()]
existing_import_error_files = [x.filename for x in session.query(ParseImportError.filename).all()]

# Add the errors of the processed files
for filename, stacktrace in import_errors.items():
if filename in existing_import_error_files:
session.query(errors.ImportError).filter(errors.ImportError.filename == filename).update(
session.query(ParseImportError).filter(ParseImportError.filename == filename).update(
{"filename": filename, "timestamp": timezone.utcnow(), "stacktrace": stacktrace},
synchronize_session="fetch",
)
else:
session.add(
errors.ImportError(
ParseImportError(
filename=filename,
timestamp=timezone.utcnow(),
stacktrace=stacktrace,
Expand Down
30 changes: 22 additions & 8 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
"DagRun",
"DagTag",
"DbCallbackRequest",
"ImportError",
"Log",
"MappedOperator",
"Operator",
Expand Down Expand Up @@ -62,19 +61,36 @@ def import_all_models():

import airflow.models.dagwarning
import airflow.models.dataset
import airflow.models.errors
import airflow.models.serialized_dag
import airflow.models.tasklog


def __getattr__(name):
# PEP-562: Lazy loaded attributes on python modules
path = __lazy_imports.get(name)
if not path:
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
if name != "ImportError":
path = __lazy_imports.get(name)
if not path:
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

from airflow.utils.module_loading import import_string
from airflow.utils.module_loading import import_string

val = import_string(f"{path}.{name}")
else:
import warnings

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models.errors import ParseImportError

warnings.warn(
f"Import '{__name__}.ImportError' is deprecated due to shadowing with builtin exception "
f"ImportError and will be removed in the future. "
f"Please consider to use '{ParseImportError.__module__}.ParseImportError' instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
val = ParseImportError

val = import_string(f"{path}.{name}")
# Store for next time
globals()[name] = val
return val
Expand All @@ -94,7 +110,6 @@ def __getattr__(name):
"DagTag": "airflow.models.dag",
"DagWarning": "airflow.models.dagwarning",
"DbCallbackRequest": "airflow.models.db_callback_request",
"ImportError": "airflow.models.errors",
"Log": "airflow.models.log",
"MappedOperator": "airflow.models.mappedoperator",
"Operator": "airflow.models.operator",
Expand Down Expand Up @@ -125,7 +140,6 @@ def __getattr__(name):
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ImportError
from airflow.models.log import Log
from airflow.models.mappedoperator import MappedOperator
from airflow.models.operator import Operator
Expand Down
19 changes: 18 additions & 1 deletion airflow/models/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
# under the License.
from __future__ import annotations

import warnings

from sqlalchemy import Column, Integer, String, Text

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models.base import Base
from airflow.utils.sqlalchemy import UtcDateTime


class ImportError(Base):
class ParseImportError(Base):
"""Stores all Import Errors which are recorded when parsing DAGs and displayed on the Webserver."""

__tablename__ = "import_error"
Expand All @@ -32,3 +35,17 @@ class ImportError(Base):
filename = Column(String(1024))
stacktrace = Column(Text)
processor_subdir = Column(String(2000), nullable=True)


def __getattr__(name: str):
# PEP-562: Lazy loaded attributes on python modules
if name == "ImportError":
warnings.warn(
f"Model class '{__name__}.ImportError' is deprecated due to shadowing with builtin exception "
f"ImportError and will be removed in the future. "
f"Please consider to use '{__name__}.ParseImportError' instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
return ParseImportError
raise AttributeError(f"module {__name__} has no attribute {name}")
4 changes: 2 additions & 2 deletions airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@

from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import errors
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
from airflow.models.errors import ParseImportError
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.code_utils import get_python_source
Expand Down Expand Up @@ -196,7 +196,7 @@ def encode_dag_run(
def check_import_errors(fileloc, session):
# Check dag import errors
import_errors = session.scalars(
select(errors.ImportError).where(errors.ImportError.filename == fileloc)
select(ParseImportError).where(ParseImportError.filename == fileloc)
).all()
if import_errors:
for import_error in import_errors:
Expand Down
7 changes: 4 additions & 3 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@
from airflow.jobs.job import Job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, Trigger, XCom, errors
from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, Trigger, XCom
from airflow.models.dag import get_dataset_triggered_next_run_info
from airflow.models.dagrun import RUN_ID_REGEX, DagRun, DagRunType
from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent, DatasetModel
from airflow.models.errors import ParseImportError
from airflow.models.operator import needs_expansion
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstanceNote
Expand Down Expand Up @@ -945,13 +946,13 @@ def index(self):
owner_links_dict = DagOwnerAttributes.get_all(session)

if get_auth_manager().is_authorized_view(access_view=AccessView.IMPORT_ERRORS):
import_errors = select(errors.ImportError).order_by(errors.ImportError.id)
import_errors = select(ParseImportError).order_by(ParseImportError.id)

can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET")
if not can_read_all_dags:
# if the user doesn't have access to all DAGs, only display errors from visible DAGs
import_errors = import_errors.where(
errors.ImportError.filename.in_(
ParseImportError.filename.in_(
select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(filter_dag_ids))
)
)
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ banned-module-level-imports = ["numpy", "pandas"]
"airflow.PY312".msg = "Use sys.version_info >= (3, 12) instead."
# Deprecated imports
"airflow.models.baseoperator.BaseOperatorLink".msg = "Use airflow.models.baseoperatorlink.BaseOperatorLink"
"airflow.models.errors.ImportError".msg = "Use airflow.models.errors.ParseImportError"
"airflow.models.ImportError".msg = "Use airflow.models.errors.ParseImportError"
# Deprecated in Python 3.11, Pending Removal in Python 3.15: https://github.com/python/cpython/issues/90817
# Deprecation warning in Python 3.11 also recommends using locale.getencoding but it available in Python 3.11
"locale.getdefaultlocale".msg = "Use locale.setlocale() and locale.getlocale() instead."
Expand Down
Loading

0 comments on commit a6f612d

Please sign in to comment.