Skip to content

Commit

Permalink
Remove XCom pickling (apache#43905)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored Nov 18, 2024
1 parent 49daa6c commit 6faa720
Show file tree
Hide file tree
Showing 14 changed files with 31 additions and 219 deletions.
2 changes: 1 addition & 1 deletion airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def get_xcom_entry(
stub.value = XCom.deserialize_value(stub)
item = stub

if stringify or conf.getboolean("core", "enable_xcom_pickling"):
if stringify:
return xcom_schema_string.dump(item)

return xcom_schema_native.dump(item)
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def get_xcom_entry(
xcom_stub.value = XCom.deserialize_value(xcom_stub)
item = xcom_stub

if stringify or conf.getboolean("core", "enable_xcom_pickling"):
if stringify:
return XComResponseString.model_validate(item, from_attributes=True)

return XComResponseNative.model_validate(item, from_attributes=True)
9 changes: 0 additions & 9 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,6 @@ core:
type: string
example: ~
default: "False"
enable_xcom_pickling:
description: |
Whether to enable pickling for xcom (note that this is insecure and allows for
RCE exploits).
version_added: ~
type: string
example: ~
default: "False"
see_also: "https://docs.python.org/3/library/pickle.html#comparison-with-json"
allowed_deserialization_classes:
description: |
What classes can be imported during deserialization. This is a multi line value.
Expand Down
4 changes: 1 addition & 3 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3368,9 +3368,7 @@ def xcom_push(
Make an XCom available for tasks to pull.
:param key: Key to store the value under.
:param value: Value to store. What types are possible depends on whether
``enable_xcom_pickling`` is true or not. If so, this can be any
picklable object; only be JSON-serializable may be used otherwise.
:param value: Value to store. Only be JSON-serializable may be used otherwise.
"""
XCom.set(
key=key,
Expand Down
28 changes: 4 additions & 24 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import inspect
import json
import logging
import pickle
from typing import TYPE_CHECKING, Any, Iterable, cast

from sqlalchemy import (
Expand Down Expand Up @@ -455,21 +454,8 @@ def serialize_value(
run_id: str | None = None,
map_index: int | None = None,
) -> Any:
"""Serialize XCom value to str or pickled object."""
if conf.getboolean("core", "enable_xcom_pickling"):
return pickle.dumps(value)
try:
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
except (ValueError, TypeError) as ex:
log.error(
"%s."
" If you are using pickle instead of JSON for XCom,"
" then you need to enable pickle support for XCom"
" in your airflow config or make sure to decorate your"
" object with attr.",
ex,
)
raise
"""Serialize XCom value to JSON str."""
return json.dumps(value, cls=XComEncoder).encode("UTF-8")

@staticmethod
def _deserialize_value(result: XCom, orm: bool) -> Any:
Expand All @@ -479,14 +465,8 @@ def _deserialize_value(result: XCom, orm: bool) -> Any:

if result.value is None:
return None
if conf.getboolean("core", "enable_xcom_pickling"):
try:
return pickle.loads(result.value)
except pickle.UnpicklingError:
return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)
else:
# Since xcom_pickling is disabled, we should only try to deserialize with JSON
return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)

return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)

@staticmethod
def deserialize_value(result: XCom) -> Any:
Expand Down
10 changes: 10 additions & 0 deletions newsfragments/aip-72.significant.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ As part of this change the following breaking changes have occurred:
- Shipping DAGs via pickle is no longer supported

This was a feature that was not widely used and was a security risk. It has been removed.

- Pickling is no longer supported for XCom serialization.

XCom data will no longer support pickling. This change is intended to improve security and simplify data
handling by supporting JSON-only serialization. DAGs that depend on XCom pickling must update to use JSON-serializable data.

As part of that change, ``[core] enable_xcom_pickling`` configuration option has been removed.

If you still need to use pickling, you can use a custom XCom backend that stores references in the metadata DB and
the pickled data can be stored in a separate storage like S3.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def execute(self, context: Context) -> KustoResultTable | str:
https://docs.microsoft.com/en-us/azure/kusto/api/rest/response2
"""
response = self.hook.run_query(self.query, self.database, self.options)
if conf.getboolean("core", "enable_xcom_pickling"):
# TODO: Remove this after minimum Airflow version is 3.0
if conf.getboolean("core", "enable_xcom_pickling", fallback=False):
return response.primary_results[0]
else:
return str(response.primary_results[0])
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def execute(self, context: Context) -> list | str:

if return_code == 0:
# returning output if do_xcom_push is set
enable_pickling = conf.getboolean("core", "enable_xcom_pickling")
# TODO: Remove this after minimum Airflow version is 3.0
enable_pickling = conf.getboolean("core", "enable_xcom_pickling", fallback=False)

if enable_pickling:
return stdout_buffer
Expand Down
3 changes: 2 additions & 1 deletion providers/src/airflow/providers/ssh/operators/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ def execute(self, context=None) -> bytes | str:

with self.get_ssh_client() as ssh_client:
result = self.run_ssh_client_command(ssh_client, self.command, context=context)
enable_pickling = conf.getboolean("core", "enable_xcom_pickling")
# TODO: Remove this after minimum Airflow version is 3.0
enable_pickling = conf.getboolean("core", "enable_xcom_pickling", fallback=False)
if not enable_pickling:
result = b64encode(result).decode("utf-8")

Expand Down
7 changes: 7 additions & 0 deletions providers/tests/sftp/operators/test_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from airflow.utils import timezone
from airflow.utils.timezone import datetime

from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS
from tests_common.test_utils.config import conf_vars

pytestmark = pytest.mark.db_test
Expand Down Expand Up @@ -95,6 +96,7 @@ def teardown_method(self):
if os.path.exists(self.test_remote_dir):
os.rmdir(self.test_remote_dir)

@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_pickle_file_transfer_put(self, dag_maker):
test_local_file_content = (
Expand Down Expand Up @@ -129,6 +131,7 @@ def test_pickle_file_transfer_put(self, dag_maker):
pulled = tis["check_file_task"].xcom_pull(task_ids="check_file_task", key="return_value")
assert pulled.strip() == test_local_file_content

@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_file_transfer_no_intermediate_dir_error_put(self, create_task_instance_of_operator):
test_local_file_content = (
Expand Down Expand Up @@ -158,6 +161,7 @@ def test_file_transfer_no_intermediate_dir_error_put(self, create_task_instance_
ti2.run()
assert "No such file" in str(ctx.value)

@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_file_transfer_with_intermediate_dir_put(self, dag_maker):
test_local_file_content = (
Expand Down Expand Up @@ -232,6 +236,7 @@ def create_remote_file_and_cleanup(self):
yield
os.remove(self.test_remote_filepath)

@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_pickle_file_transfer_get(self, dag_maker, create_remote_file_and_cleanup):
with dag_maker(dag_id="unit_tests_sftp_op_pickle_file_transfer_get"):
Expand Down Expand Up @@ -275,6 +280,7 @@ def test_json_file_transfer_get(self, dag_maker, create_remote_file_and_cleanup)
content_received = file.read()
assert content_received == self.test_remote_file_content

@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_file_transfer_no_intermediate_dir_error_get(self, dag_maker, create_remote_file_and_cleanup):
with dag_maker(dag_id="unit_tests_sftp_op_file_transfer_no_intermediate_dir_error_get"):
Expand All @@ -298,6 +304,7 @@ def test_file_transfer_no_intermediate_dir_error_get(self, dag_maker, create_rem
ti.run()
assert "No such file" in str(ctx.value)

@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_file_transfer_with_intermediate_dir_error_get(self, dag_maker, create_remote_file_and_cleanup):
with dag_maker(dag_id="unit_tests_sftp_op_file_transfer_with_intermediate_dir_error_get"):
Expand Down
30 changes: 0 additions & 30 deletions tests/api_connexion/endpoints/test_xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,36 +158,6 @@ def test_should_respond_200_native(self):
"value": {"key": "value"},
}

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_should_respond_200_native_for_pickled(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
logical_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
logical_date_parsed = timezone.parse(logical_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
value_non_serializable_key = {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359}
self._create_xcom_entry(
dag_id, run_id, logical_date_parsed, task_id, xcom_key, {"key": value_non_serializable_key}
)
response = self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
environ_overrides={"REMOTE_USER": "test"},
)
assert 200 == response.status_code

current_data = response.json
current_data["timestamp"] = "TIMESTAMP"
assert current_data == {
"dag_id": dag_id,
"logical_date": logical_date,
"key": xcom_key,
"task_id": task_id,
"map_index": -1,
"timestamp": "TIMESTAMP",
"value": f"{{'key': {str(value_non_serializable_key)}}}",
}

def test_should_raise_404_for_non_existent_xcom(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
Expand Down
51 changes: 0 additions & 51 deletions tests/api_connexion/schemas/test_xcom_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,18 @@
# under the License.
from __future__ import annotations

import pickle

import pytest
from sqlalchemy import or_, select

from airflow.api_connexion.schemas.xcom_schema import (
XComCollection,
xcom_collection_item_schema,
xcom_collection_schema,
xcom_schema_string,
)
from airflow.models import DagRun, XCom
from airflow.utils import timezone
from airflow.utils.session import create_session

from tests_common.test_utils.config import conf_vars

pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]


Expand Down Expand Up @@ -184,49 +179,3 @@ def test_serialize(self, create_xcom, session):
"total_entries": 2,
},
)


class TestXComSchema:
default_time = "2016-04-02T21:00:00+00:00"
default_time_parsed = timezone.parse(default_time)

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_serialize(self, create_xcom, session):
create_xcom(
dag_id="test_dag",
task_id="test_task_id",
logical_date=self.default_time_parsed,
key="test_key",
value=pickle.dumps(b"test_binary"),
)
xcom_model = session.query(XCom).first()
deserialized_xcom = xcom_schema_string.dump(xcom_model)
assert deserialized_xcom == {
"key": "test_key",
"timestamp": self.default_time,
"logical_date": self.default_time,
"task_id": "test_task_id",
"dag_id": "test_dag",
"value": "test_binary",
"map_index": -1,
}

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_deserialize(self):
xcom_dump = {
"key": "test_key",
"timestamp": self.default_time,
"logical_date": self.default_time,
"task_id": "test_task_id",
"dag_id": "test_dag",
"value": b"test_binary",
}
result = xcom_schema_string.load(xcom_dump)
assert result == {
"key": "test_key",
"timestamp": self.default_time_parsed,
"logical_date": self.default_time_parsed,
"task_id": "test_task_id",
"dag_id": "test_dag",
"value": "test_binary",
}
21 changes: 0 additions & 21 deletions tests/api_fastapi/core_api/routes/public/test_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@

TEST_XCOM_KEY = "test_xcom_key"
TEST_XCOM_VALUE = {"key": "value"}
TEST_XCOM_KEY2 = "test_xcom_key_non_serializable"
TEST_XCOM_VALUE2 = {"key": {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359}}
TEST_XCOM_KEY3 = "test_xcom_key_non_existing"

TEST_DAG_ID = "test-dag-id"
Expand Down Expand Up @@ -140,25 +138,6 @@ def test_should_respond_200_native(self, test_client):
"value": TEST_XCOM_VALUE,
}

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_should_respond_200_pickled(self, test_client):
self.create_xcom(TEST_XCOM_KEY2, TEST_XCOM_VALUE2)
response = test_client.get(
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY2}"
)
assert response.status_code == 200

current_data = response.json()
assert current_data == {
"dag_id": TEST_DAG_ID,
"logical_date": logical_date_parsed.strftime("%Y-%m-%dT%H:%M:%SZ"),
"key": TEST_XCOM_KEY2,
"task_id": TEST_TASK_ID,
"map_index": -1,
"timestamp": current_data["timestamp"],
"value": str(TEST_XCOM_VALUE2),
}

def test_should_raise_404_for_non_existent_xcom(self, test_client):
response = test_client.get(
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY3}"
Expand Down
Loading

0 comments on commit 6faa720

Please sign in to comment.