From 007e8876b3484f5d743c2c78ee4b23c9ffbb3dc1 Mon Sep 17 00:00:00 2001 From: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> Date: Fri, 13 Dec 2024 23:13:33 +0100 Subject: [PATCH] Revert removal of Pydantic model support from PR 44552 to restore compatibility with Airflow 2.10 (#44921) --- providers/src/airflow/providers/edge/CHANGELOG.rst | 8 ++++++++ providers/src/airflow/providers/edge/__init__.py | 2 +- providers/src/airflow/providers/edge/provider.yaml | 2 +- .../providers/edge/worker_api/routes/_v2_routes.py | 9 ++++++--- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst b/providers/src/airflow/providers/edge/CHANGELOG.rst index c0f153befb254..f9d9816d34acf 100644 --- a/providers/src/airflow/providers/edge/CHANGELOG.rst +++ b/providers/src/airflow/providers/edge/CHANGELOG.rst @@ -27,6 +27,14 @@ Changelog --------- +0.9.5pre0 +......... + +Misc +~~~~ + +* ``Revert removal of Pydantic model support from PR 44552 to restore compatibility with Airflow 2.10.`` + 0.9.4pre0 ......... diff --git a/providers/src/airflow/providers/edge/__init__.py b/providers/src/airflow/providers/edge/__init__.py index a9c9f6ce6d845..72aaf60f36499 100644 --- a/providers/src/airflow/providers/edge/__init__.py +++ b/providers/src/airflow/providers/edge/__init__.py @@ -29,7 +29,7 @@ __all__ = ["__version__"] -__version__ = "0.9.4pre0" +__version__ = "0.9.5pre0" if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( "2.10.0" diff --git a/providers/src/airflow/providers/edge/provider.yaml b/providers/src/airflow/providers/edge/provider.yaml index 4f3fc0daa0eae..ac64d35691bf0 100644 --- a/providers/src/airflow/providers/edge/provider.yaml +++ b/providers/src/airflow/providers/edge/provider.yaml @@ -27,7 +27,7 @@ source-date-epoch: 1729683247 # note that those versions are maintained by release manager - do not update them manually versions: - - 0.9.4pre0 + - 0.9.5pre0 dependencies: - apache-airflow>=2.10.0 diff --git a/providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py b/providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py index b4e9d4405630c..684c9dd4dae68 100644 --- a/providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py +++ b/providers/src/airflow/providers/edge/worker_api/routes/_v2_routes.py @@ -88,7 +88,8 @@ def rpcapi_v2(body: dict[str, Any]) -> APIResponse: params = {} try: if request_obj.params: - params = BaseSerialization.deserialize(request_obj.params) + # Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization + params = BaseSerialization.deserialize(request_obj.params, use_pydantic_models=True) # type: ignore[call-arg] except Exception: raise error_response("Error deserializing parameters.", status.HTTP_400_BAD_REQUEST) @@ -97,13 +98,15 @@ def rpcapi_v2(body: dict[str, Any]) -> APIResponse: # Session must be created there as it may be needed by serializer for lazy-loaded fields. with create_session() as session: output = handler(**params, session=session) - output_json = BaseSerialization.serialize(output) + # Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization + output_json = BaseSerialization.serialize(output, use_pydantic_models=True) # type: ignore[call-arg] log.debug( "Sending response: %s", json.dumps(output_json) if output_json is not None else None ) # In case of AirflowException or other selective known types, transport the exception class back to caller except (KeyError, AttributeError, AirflowException) as e: - output_json = BaseSerialization.serialize(e) + # Note, this is Airflow 2.10 specific, as it uses Pydantic models for serialization + output_json = BaseSerialization.serialize(e, use_pydantic_models=True) # type: ignore[call-arg] log.debug( "Sending exception response: %s", json.dumps(output_json) if output_json is not None else None )