diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index 85631e0965078..76ae3e36ba111 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -136,7 +136,7 @@ "dag": { "type": "object", "properties": { - "params": { "$ref": "#/definitions/params_dict" }, + "params": { "$ref": "#/definitions/params" }, "_dag_id": { "type": "string" }, "tasks": { "$ref": "#/definitions/tasks" }, "timezone": { "$ref": "#/definitions/timezone" }, @@ -206,9 +206,13 @@ "type": "array", "additionalProperties": { "$ref": "#/definitions/operator" } }, - "params_dict": { - "type": "object", - "additionalProperties": {"$ref": "#/definitions/param" } + "params": { + "type": "array", + "prefixItems": [ + { "type": "string" }, + { "$ref": "#/definitions/param" } + ], + "unevaluatedItems": false }, "param": { "$comment": "A param for a dag / operator", @@ -258,7 +262,7 @@ "retry_delay": { "$ref": "#/definitions/timedelta" }, "retry_exponential_backoff": { "type": "boolean" }, "max_retry_delay": { "$ref": "#/definitions/timedelta" }, - "params": { "$ref": "#/definitions/params_dict" }, + "params": { "$ref": "#/definitions/params" }, "priority_weight": { "type": "number" }, "weight_rule": { "type": "string" }, "executor": { "type": "string" }, diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index eb9c15f43cdcd..a73684b9a524b 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -827,9 +827,9 @@ def is_serialized(val): return class_(**kwargs) @classmethod - def _serialize_params_dict(cls, params: ParamsDict | dict): - """Serialize Params dict for a DAG or task.""" - serialized_params = {} + def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, dict]]: + """Serialize Params dict for a DAG or task as a list of tuples to ensure ordering.""" + serialized_params = [] for k, v in params.items(): # TODO: As of now, we would allow serialization of params which are of type Param only. try: @@ -837,7 +837,7 @@ def _serialize_params_dict(cls, params: ParamsDict | dict): except AttributeError: class_identity = "" if class_identity == "airflow.models.param.Param": - serialized_params[k] = cls._serialize_param(v) + serialized_params.append((k, cls._serialize_param(v))) else: raise ValueError( f"Params to a DAG or a Task can be only of type airflow.models.param.Param, " @@ -846,10 +846,16 @@ def _serialize_params_dict(cls, params: ParamsDict | dict): return serialized_params @classmethod - def _deserialize_params_dict(cls, encoded_params: dict) -> ParamsDict: + def _deserialize_params_dict(cls, encoded_params: list[tuple[str, dict]]) -> ParamsDict: """Deserialize a DAG's Params dict.""" + if isinstance(encoded_params, collections.abc.Mapping): + # in 2.9.2 or earlier params were serialized as JSON objects + encoded_param_pairs: Iterable[tuple[str, dict]] = encoded_params.items() + else: + encoded_param_pairs = encoded_params + op_params = {} - for k, v in encoded_params.items(): + for k, v in encoded_param_pairs: if isinstance(v, dict) and "__class" in v: op_params[k] = cls._deserialize_param(v) else: diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index c46d4e18a0734..848d26119506e 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -206,6 +206,22 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields): expected_dependencies = {dag_id: [] for dag_id in example_dags} assert SDM.get_dag_dependencies() == expected_dependencies + def test_order_of_dag_params_is_stable(self): + """ + This asserts that we have logic in place which guarantees the order + of the params is maintained - even if the backend (e.g. MySQL) mutates + the serialized DAG JSON. + """ + example_dags = make_example_dags(example_dags_module) + example_params_trigger_ui = example_dags.get("example_params_trigger_ui") + before = list(example_params_trigger_ui.params.keys()) + + SDM.write_dag(example_params_trigger_ui) + retrieved_dag = SDM.get_dag("example_params_trigger_ui") + after = list(retrieved_dag.params.keys()) + + assert before == after + def test_order_of_deps_is_consistent(self): """ Previously the 'dag_dependencies' node in serialized dag was converted to list from set. diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 16f6d3cb68fe6..c0e1d69c09770 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -245,7 +245,7 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i }, "edge_info": {}, "dag_dependencies": [], - "params": {}, + "params": [], }, } @@ -2082,6 +2082,25 @@ def test_params_upgrade(self): assert isinstance(dag.params.get_param("none"), Param) assert dag.params["str"] == "str" + def test_params_serialization_from_dict_upgrade(self): + """In <=2.9.2 params were serialized as a JSON object instead of a list of key-value pairs. + This test asserts that the params are still deserialized properly.""" + serialized = { + "__version": 1, + "dag": { + "_dag_id": "simple_dag", + "fileloc": "/path/to/file.py", + "tasks": [], + "timezone": "UTC", + "params": {"my_param": {"__class": "airflow.models.param.Param", "default": "str"}}, + }, + } + dag = SerializedDAG.from_dict(serialized) + + param = dag.params.get_param("my_param") + assert isinstance(param, Param) + assert param.value == "str" + def test_params_serialize_default_2_2_0(self): """In 2.0.0, param ``default`` was assumed to be json-serializable objects and were not run though the standard serializer function. In 2.2.2 we serialize param ``default``. We keep this @@ -2093,7 +2112,7 @@ def test_params_serialize_default_2_2_0(self): "fileloc": "/path/to/file.py", "tasks": [], "timezone": "UTC", - "params": {"str": {"__class": "airflow.models.param.Param", "default": "str"}}, + "params": [["str", {"__class": "airflow.models.param.Param", "default": "str"}]], }, } SerializedDAG.validate_schema(serialized) @@ -2110,14 +2129,17 @@ def test_params_serialize_default(self): "fileloc": "/path/to/file.py", "tasks": [], "timezone": "UTC", - "params": { - "my_param": { - "default": "a string value", - "description": "hello", - "schema": {"__var": {"type": "string"}, "__type": "dict"}, - "__class": "airflow.models.param.Param", - } - }, + "params": [ + [ + "my_param", + { + "default": "a string value", + "description": "hello", + "schema": {"__var": {"type": "string"}, "__type": "dict"}, + "__class": "airflow.models.param.Param", + }, + ] + ], }, } SerializedDAG.validate_schema(serialized)