Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dill package to use cloudpickle #34

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_taskflow_api_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def tutorial_taskflow_api_virtualenv():
"""

@task.virtualenv(
use_dill=True,
use_cloudpickle=True,
system_site_packages=False,
requirements=["funcsigs"],
)
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/dagpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from typing import TYPE_CHECKING

import dill
import cloudpickle
from sqlalchemy import BigInteger, Column, Integer, PickleType

from airflow.models.base import Base
Expand All @@ -42,7 +42,7 @@ class DagPickle(Base):
"""

id = Column(Integer, primary_key=True)
pickle = Column(PickleType(pickler=dill))
pickle = Column(PickleType(pickler=cloudpickle))
created_dttm = Column(UtcDateTime, default=timezone.utcnow)
pickle_hash = Column(BigInteger)

Expand Down
4 changes: 2 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from typing import TYPE_CHECKING, Any, Callable, Collection, Generator, Iterable, Mapping, Tuple
from urllib.parse import quote

import dill
import cloudpickle
import jinja2
import lazy_object_proxy
import pendulum
Expand Down Expand Up @@ -1287,7 +1287,7 @@ class TaskInstance(Base, LoggingMixin):
queued_dttm = Column(UtcDateTime)
queued_by_job_id = Column(Integer)
pid = Column(Integer)
executor_config = Column(ExecutorConfigType(pickler=dill))
executor_config = Column(ExecutorConfigType(pickler=cloudpickle))
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow)
rendered_map_index = Column(String(250))

Expand Down
57 changes: 43 additions & 14 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Mapping, NamedTuple, Sequence, cast

import dill
import cloudpickle

from airflow.compat.functools import cache
from airflow.exceptions import (
AirflowConfigException,
AirflowException,
AirflowProviderDeprecationWarning,
AirflowSkipException,
DeserializingResultError,
RemovedInAirflow3Warning,
Expand Down Expand Up @@ -392,6 +393,7 @@ def __init__(
*,
python_callable: Callable,
use_dill: bool = False,
VladaZakharova marked this conversation as resolved.
Show resolved Hide resolved
use_cloudpickle: bool = False,
op_args: Collection[Any] | None = None,
op_kwargs: Mapping[str, Any] | None = None,
string_args: Iterable[str] | None = None,
Expand All @@ -416,8 +418,16 @@ def __init__(
**kwargs,
)
self.string_args = string_args or []
self.use_dill = use_dill
self.pickling_library = dill if self.use_dill else pickle
if use_dill:
warnings.warn(
"The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use "
"'use_cloudpickle' instead. ",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
use_cloudpickle = use_dill
self.use_cloudpickle = use_cloudpickle
self.pickling_library = cloudpickle if self.use_cloudpickle else pickle
self.expect_airflow = expect_airflow
self.skip_on_exit_code = (
skip_on_exit_code
Expand Down Expand Up @@ -548,9 +558,9 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
VladaZakharova marked this conversation as resolved.
Show resolved Hide resolved
:param use_cloudpickle: Whether to use cloudpickle to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include cloudpickle in your requirements.
:param system_site_packages: Whether to include
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
Expand Down Expand Up @@ -593,6 +603,7 @@ def __init__(
requirements: None | Iterable[str] | str = None,
python_version: str | None = None,
use_dill: bool = False,
use_cloudpickle: bool = False,
system_site_packages: bool = True,
pip_install_options: list[str] | None = None,
op_args: Collection[Any] | None = None,
Expand Down Expand Up @@ -623,6 +634,14 @@ def __init__(
RemovedInAirflow3Warning,
stacklevel=2,
)
if use_dill:
warnings.warn(
"The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use "
"'use_cloudpickle' instead. ",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
use_cloudpickle = use_dill
if not is_venv_installed():
raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.")
if not requirements:
Expand All @@ -643,7 +662,7 @@ def __init__(
self.venv_cache_path = venv_cache_path
super().__init__(
python_callable=python_callable,
use_dill=use_dill,
use_cloudpickle=use_cloudpickle,
op_args=op_args,
op_kwargs=op_kwargs,
string_args=string_args,
Expand All @@ -657,8 +676,8 @@ def __init__(
def _requirements_list(self) -> list[str]:
"""Prepare a list of requirements that need to be installed for the virtual environment."""
requirements = [str(dependency) for dependency in self.requirements]
if not self.system_site_packages and self.use_dill and "dill" not in requirements:
requirements.append("dill")
if not self.system_site_packages and self.use_cloudpickle and "cloudpickle" not in requirements:
requirements.append("cloudpickle")
requirements.sort() # Ensure a hash is stable
return requirements

Expand Down Expand Up @@ -816,10 +835,11 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param python_callable: A python function with no references to outside variables,
defined with def, which will be run in a virtual environment
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but if dill is not preinstalled in your virtual environment, the task will fail with use_dill enabled.
defined with def, which will be run in a virtual environment.
:param use_cloudpickle: Whether to use cloudpickle to serialize
the args and result (pickle is default). This allows more complex types
but if cloudpickle is not preinstalled in your virtual environment, the task will fail
with use_cloudpickle enabled.
:param op_args: A list of positional arguments to pass to python_callable.
:param op_kwargs: A dict of keyword arguments to pass to python_callable.
:param string_args: Strings that are present in the global var virtualenv_string_args,
Expand Down Expand Up @@ -847,6 +867,7 @@ def __init__(
python: str,
python_callable: Callable,
use_dill: bool = False,
use_cloudpickle: bool = False,
op_args: Collection[Any] | None = None,
op_kwargs: Mapping[str, Any] | None = None,
string_args: Iterable[str] | None = None,
Expand All @@ -859,11 +880,19 @@ def __init__(
):
if not python:
raise ValueError("Python Path must be defined in ExternalPythonOperator")
if use_dill:
warnings.warn(
"The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use "
"'use_cloudpickle' instead. ",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
use_cloudpickle = use_dill
self.python = python
self.expect_pendulum = expect_pendulum
super().__init__(
python_callable=python_callable,
use_dill=use_dill,
use_cloudpickle=use_cloudpickle,
op_args=op_args,
op_kwargs=op_kwargs,
string_args=string_args,
Expand Down
20 changes: 16 additions & 4 deletions airflow/providers/cncf/kubernetes/decorators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import os
import pickle
import uuid
import warnings
from shlex import quote
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Callable, Sequence

import dill
import cloudpickle
from kubernetes.client import models as k8s

from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.python_kubernetes_script import (
write_python_script,
Expand Down Expand Up @@ -65,8 +67,18 @@ class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator):
# there are some cases we can't deepcopy the objects (e.g protobuf).
shallow_copy_attrs: Sequence[str] = ("python_callable",)

def __init__(self, namespace: str = "default", use_dill: bool = False, **kwargs) -> None:
self.use_dill = use_dill
def __init__(
self, namespace: str = "default", use_dill: bool = False, use_cloudpickle: bool = False, **kwargs
) -> None:
if use_dill:
warnings.warn(
"The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use "
"'use_cloudpickle' instead. ",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
self.use_cloudpickle = use_dill
self.use_cloudpickle = use_cloudpickle
super().__init__(
namespace=namespace,
name=kwargs.pop("name", f"k8s_airflow_pod_{uuid.uuid4().hex}"),
Expand Down Expand Up @@ -100,7 +112,7 @@ def _generate_cmds(self) -> list[str]:

def execute(self, context: Context):
with TemporaryDirectory(prefix="venv") as tmp_dir:
pickling_library = dill if self.use_dill else pickle
pickling_library = cloudpickle if self.use_cloudpickle else pickle
script_filename = os.path.join(tmp_dir, "script.py")
input_filename = os.path.join(tmp_dir, "script.in")

Expand Down
21 changes: 16 additions & 5 deletions airflow/providers/docker/decorators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import base64
import os
import pickle
import warnings
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Callable, Sequence

import dill
import cloudpickle

from airflow.decorators.base import DecoratedOperator, task_decorator_factory
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.python_virtualenv import write_python_script

Expand Down Expand Up @@ -53,7 +55,7 @@ class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):

:param python_callable: A reference to an object that is callable
:param python: Python binary name to use
:param use_dill: Whether dill should be used to serialize the callable
:param use_cloudpickle: Whether cloudpickle should be used to serialize the callable
:param expect_airflow: whether to expect airflow to be installed in the docker environment. if this
one is specified, the script to run callable will attempt to load Airflow macros.
:param op_kwargs: a dictionary of keyword arguments that will get unpacked
Expand All @@ -72,14 +74,23 @@ class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
def __init__(
self,
use_dill=False,
use_cloudpickle=False,
python_command="python3",
expect_airflow: bool = True,
**kwargs,
) -> None:
command = "placeholder command"
self.python_command = python_command
self.expect_airflow = expect_airflow
self.use_dill = use_dill
if use_dill:
warnings.warn(
"The 'use_dill' parameter is deprecated and will be removed after 01.10.2024. Please use "
"'use_cloudpickle' instead. ",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
use_cloudpickle = use_dill
self.use_cloudpickle = use_cloudpickle
super().__init__(
command=command, retrieve_output=True, retrieve_output_path="/tmp/script.out", **kwargs
)
Expand Down Expand Up @@ -128,8 +139,8 @@ def execute(self, context: Context):

@property
def pickling_library(self):
if self.use_dill:
return dill
if self.use_cloudpickle:
return cloudpickle
return pickle


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from typing import TYPE_CHECKING, Callable, Iterable, TypeVar
from urllib.parse import urlsplit

import dill
import cloudpickle

from airflow.exceptions import AirflowException
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -234,7 +234,7 @@ def validate_err_and_count(summary):
dag=dag,
)

metric_fn_encoded = base64.b64encode(dill.dumps(metric_fn, recurse=True)).decode()
metric_fn_encoded = base64.b64encode(cloudpickle.dumps(metric_fn)).decode()
evaluate_summary = BeamRunPythonPipelineOperator(
task_id=(task_prefix + "-summary"),
runner=BeamRunnerType.DataflowRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
- ``--metric_fn_encoded``:
An encoded function that calculates and returns a tuple of metric(s)
for a given instance (as a dictionary). It should be encoded
via ``base64.b64encode(dill.dumps(fn, recurse=True))``.
via ``base64.b64encode(cloudpickle.dumps(fn, recurse=True))``.
- ``--metric_keys``:
A comma-separated key(s) of the aggregated metric(s) in the summary
output. The order and the size of the keys must match to the output
Expand All @@ -57,7 +57,7 @@ def metric_fn(inst):
squared_err = (classes-label)**2
return (log_loss, squared_err)
return metric_fn
metric_fn_encoded = base64.b64encode(dill.dumps(get_metric_fn(), recurse=True))
metric_fn_encoded = base64.b64encode(cloudpickle.dumps(get_metric_fn(), recurse=True))
DataflowCreatePythonJobOperator(
task_id="summary-prediction",
py_options=["-m"],
Expand Down Expand Up @@ -116,7 +116,7 @@ def metric_fn(inst):
import os

import apache_beam as beam
import dill
import cloudpickle
from apache_beam.coders.coders import Coder


Expand Down Expand Up @@ -170,7 +170,7 @@ def run(argv=None):
help=(
"An encoded function that calculates and returns a tuple of "
"metric(s) for a given instance (as a dictionary). It should be "
"encoded via base64.b64encode(dill.dumps(fn, recurse=True))."
"encoded via base64.b64encode(cloudpickle.dumps(fn))."
),
)
parser.add_argument(
Expand All @@ -186,7 +186,7 @@ def run(argv=None):
)
known_args, pipeline_args = parser.parse_known_args(argv)

metric_fn = dill.loads(base64.b64decode(known_args.metric_fn_encoded))
metric_fn = cloudpickle.loads(base64.b64decode(known_args.metric_fn_encoded))
if not callable(metric_fn):
raise ValueError("--metric_fn_encoded must be an encoded callable.")
metric_keys = known_args.metric_keys.split(",")
Expand Down
1 change: 1 addition & 0 deletions hatch_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@
"blinker>=1.6.2",
# Colorlog 6.x merges TTYColoredFormatter into ColoredFormatter, breaking backwards compatibility with 4.x
# Update CustomTTYColoredFormatter to remove
"cloudpickle>=2.0.0",
"colorlog>=4.0.2, <5.0",
"configupdater>=3.1.1",
# `airflow/www/extensions/init_views` imports `connexion.decorators.validation.RequestBodyValidator`
Expand Down
Loading
Loading