From edb603714c48399ba6af8658dedd20e94267b764 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Thu, 18 Apr 2024 02:42:33 +0800 Subject: [PATCH] Raw Container Task Local Execution (#2258) * init Signed-off-by: Future-Outlier * v1 Signed-off-by: Future-Outlier * argurments bug fixed and add log when pulling image Signed-off-by: Future-Outlier * change v to k and handle boolean special case Signed-off-by: Future-Outlier * support blob type and datetime Signed-off-by: Future-Outlier * add unit tests Signed-off-by: Future-Outlier * add exception Signed-off-by: Future-Outlier * nit Signed-off-by: Future-Outlier * fix test Signed-off-by: Future-Outlier * update for flytefile and flytedirectory Signed-off-by: Future-Outlier * support both file paths and template inputs Signed-off-by: Future-Outlier * pytest use sys platform to handle macos and windows case and support regex to parse the input Signed-off-by: Future-Outlier * support datetime.timedelta Signed-off-by: Future-Outlier * lint Signed-off-by: Future-Outlier * add tests and change boolean logic Signed-off-by: Future-Outlier * support Signed-off-by: Future-Outlier * change annotations Signed-off-by: Future-Outlier * add flytefile and flytedir tests Signed-off-by: Future-Outlier * lint Signed-off-by: Future-Outlier * add more tests Signed-off-by: Future-Outlier * lint Signed-off-by: Future-Outlier * change image name Signed-off-by: Future-Outlier * Update pingsu's advice Signed-off-by: Future-Outlier Co-authored-by: Kevin Su * add docker in dev-requirement Signed-off-by: Future-Outlier * refactor execution Signed-off-by: Future-Outlier * use render pattern Signed-off-by: Future-Outlier * add back container task object in test Signed-off-by: Future-Outlier * refactor output in container task execution Signed-off-by: Future-Outlier * update pingsu's render input advice Signed-off-by: Future-Outlier * update tests Signed-off-by: Future-Outlier * add LiteralMap TypeHints Signed-off-by: Future-Outlier * update dev-req Signed-off-by: Future-Outlier --------- Signed-off-by: Future-Outlier Co-authored-by: Kevin Su Signed-off-by: Jan Fiedler --- flytekit/core/container_task.py | 165 ++++++++++++- .../unit/core/Dockerfile.raw_container | 9 + tests/flytekit/unit/core/return_same_value.py | 22 ++ .../flytekit/unit/core/test_container_task.py | 87 +++++-- .../unit/core/test_local_raw_container.py | 218 ++++++++++++++++++ tests/flytekit/unit/core/write_flytedir.py | 24 ++ tests/flytekit/unit/core/write_flytefile.py | 17 ++ 7 files changed, 525 insertions(+), 17 deletions(-) create mode 100644 tests/flytekit/unit/core/Dockerfile.raw_container create mode 100644 tests/flytekit/unit/core/return_same_value.py create mode 100644 tests/flytekit/unit/core/test_local_raw_container.py create mode 100644 tests/flytekit/unit/core/write_flytedir.py create mode 100644 tests/flytekit/unit/core/write_flytefile.py diff --git a/flytekit/core/container_task.py b/flytekit/core/container_task.py index 7773226c1a..66fe522c07 100644 --- a/flytekit/core/container_task.py +++ b/flytekit/core/container_task.py @@ -1,6 +1,7 @@ +import os import typing from enum import Enum -from typing import Any, Dict, List, Optional, OrderedDict, Type +from typing import Any, Dict, List, Optional, OrderedDict, Tuple, Type from flytekit.configuration import SerializationSettings from flytekit.core.base_task import PythonTask, TaskMetadata @@ -11,10 +12,13 @@ from flytekit.core.resources import Resources, ResourceSpec from flytekit.core.utils import _get_container_definition, _serialize_pod_spec from flytekit.image_spec.image_spec import ImageSpec +from flytekit.loggers import logger from flytekit.models import task as _task_model +from flytekit.models.literals import LiteralMap from flytekit.models.security import Secret, SecurityContext _PRIMARY_CONTAINER_NAME_FIELD = "primary_container_name" +DOCKER_IMPORT_ERROR_MESSAGE = "Docker is not installed. Please install Docker by running `pip install docker`." class ContainerTask(PythonTask): @@ -82,6 +86,7 @@ def __init__( self._args = arguments self._input_data_dir = input_data_dir self._output_data_dir = output_data_dir + self._outputs = outputs self._md_format = metadata_format self._io_strategy = io_strategy self._resources = ResourceSpec( @@ -93,8 +98,162 @@ def __init__( def resources(self) -> ResourceSpec: return self._resources - def local_execute(self, ctx: FlyteContext, **kwargs) -> Any: - raise RuntimeError("ContainerTask is not supported in local executions.") + def _extract_command_key(self, cmd: str, **kwargs) -> Any: + """ + Extract the key from the command using regex. + """ + import re + + input_regex = r"^\{\{\s*\.inputs\.(.*?)\s*\}\}$" + match = re.match(input_regex, cmd) + if match: + return match.group(1) + return None + + def _render_command_and_volume_binding(self, cmd: str, **kwargs) -> Tuple[str, Dict[str, Dict[str, str]]]: + """ + We support template-style references to inputs, e.g., "{{.inputs.infile}}". + """ + from flytekit.types.directory import FlyteDirectory + from flytekit.types.file import FlyteFile + + command = "" + volume_binding = {} + k = self._extract_command_key(cmd) + + if k: + input_val = kwargs.get(k) + if type(input_val) in [FlyteFile, FlyteDirectory]: + local_flyte_file_or_dir_path = str(input_val) + remote_flyte_file_or_dir_path = os.path.join(self._input_data_dir, k.replace(".", "/")) # type: ignore + volume_binding[local_flyte_file_or_dir_path] = { + "bind": remote_flyte_file_or_dir_path, + "mode": "rw", + } + command = remote_flyte_file_or_dir_path + else: + command = str(input_val) + else: + command = cmd + + return command, volume_binding + + def _prepare_command_and_volumes( + self, cmd_and_args: List[str], **kwargs + ) -> Tuple[List[str], Dict[str, Dict[str, str]]]: + """ + Prepares the command and volume bindings for the container based on input arguments and command templates. + + Parameters: + - cmd_and_args (List[str]): The command and arguments to prepare. + - **kwargs: Keyword arguments representing task inputs. + + Returns: + - Tuple[List[str], Dict[str, Dict[str, str]]]: A tuple containing the prepared commands and volume bindings. + """ + + commands = [] + volume_bindings = {} + + for cmd in cmd_and_args: + command, volume_binding = self._render_command_and_volume_binding(cmd, **kwargs) + commands.append(command) + volume_bindings.update(volume_binding) + + return commands, volume_bindings + + def _pull_image_if_not_exists(self, client, image: str): + try: + if not client.images.list(filters={"reference": image}): + logger.info(f"Pulling image: {image} for container task: {self.name}") + client.images.pull(image) + except Exception as e: + logger.error(f"Failed to pull image {image}: {str(e)}") + raise + + def _string_to_timedelta(self, s: str): + import datetime + import re + + regex = r"(?:(\d+) days?, )?(?:(\d+):)?(\d+):(\d+)(?:\.(\d+))?" + parts = re.match(regex, s) + if not parts: + raise ValueError("Invalid timedelta string format") + + days = int(parts.group(1)) if parts.group(1) else 0 + hours = int(parts.group(2)) if parts.group(2) else 0 + minutes = int(parts.group(3)) if parts.group(3) else 0 + seconds = int(parts.group(4)) if parts.group(4) else 0 + microseconds = int(parts.group(5)) if parts.group(5) else 0 + + return datetime.timedelta( + days=days, + hours=hours, + minutes=minutes, + seconds=seconds, + microseconds=microseconds, + ) + + def _convert_output_val_to_correct_type(self, output_val: Any, output_type: Any) -> Any: + import datetime + + if output_type == bool: + return output_val.lower() != "false" + elif output_type == datetime.datetime: + return datetime.datetime.fromisoformat(output_val) + elif output_type == datetime.timedelta: + return self._string_to_timedelta(output_val) + else: + return output_type(output_val) + + def _get_output_dict(self, output_directory: str) -> Dict[str, Any]: + from flytekit.types.directory import FlyteDirectory + from flytekit.types.file import FlyteFile + + output_dict = {} + if self._outputs: + for k, output_type in self._outputs.items(): + output_path = os.path.join(output_directory, k) + if output_type in [FlyteFile, FlyteDirectory]: + output_dict[k] = output_type(path=output_path) + else: + with open(output_path, "r") as f: + output_val = f.read() + output_dict[k] = self._convert_output_val_to_correct_type(output_val, output_type) + return output_dict + + def execute(self, **kwargs) -> LiteralMap: + try: + import docker + except ImportError: + raise ImportError(DOCKER_IMPORT_ERROR_MESSAGE) + + from flytekit.core.type_engine import TypeEngine + + ctx = FlyteContext.current_context() + + # Normalize the input and output directories + self._input_data_dir = os.path.normpath(self._input_data_dir) if self._input_data_dir else "" + self._output_data_dir = os.path.normpath(self._output_data_dir) if self._output_data_dir else "" + + output_directory = ctx.file_access.get_random_local_directory() + cmd_and_args = (self._cmd or []) + (self._args or []) + commands, volume_bindings = self._prepare_command_and_volumes(cmd_and_args, **kwargs) + volume_bindings[output_directory] = {"bind": self._output_data_dir, "mode": "rw"} + + client = docker.from_env() + self._pull_image_if_not_exists(client, self._image) + + container = client.containers.run( + self._image, command=commands, remove=True, volumes=volume_bindings, detach=True + ) + # Wait for the container to finish the task + # TODO: Add a 'timeout' parameter to control the max wait time for the container to finish the task. + container.wait() + + output_dict = self._get_output_dict(output_directory) + outputs_literal_map = TypeEngine.dict_to_literal_map(ctx, output_dict) + return outputs_literal_map def get_container(self, settings: SerializationSettings) -> _task_model.Container: # if pod_template is specified, return None here but in get_k8s_pod, return pod_template merged with container diff --git a/tests/flytekit/unit/core/Dockerfile.raw_container b/tests/flytekit/unit/core/Dockerfile.raw_container new file mode 100644 index 0000000000..f32efd871a --- /dev/null +++ b/tests/flytekit/unit/core/Dockerfile.raw_container @@ -0,0 +1,9 @@ +FROM python:3.9-alpine + +WORKDIR /root + +COPY ./write_flytefile.py /root/write_flytefile.py +COPY ./write_flytedir.py /root/write_flytedir.py +COPY ./return_same_value.py /root/return_same_value.py + +CMD ["/bin/sh"] diff --git a/tests/flytekit/unit/core/return_same_value.py b/tests/flytekit/unit/core/return_same_value.py new file mode 100644 index 0000000000..897b0af29d --- /dev/null +++ b/tests/flytekit/unit/core/return_same_value.py @@ -0,0 +1,22 @@ +import os +import sys + + +def write_output(output_dir, output_file, v): + # Ensure the output directory exists + os.makedirs(output_dir, exist_ok=True) # This will create the directory if it doesn't exist + with open(f"{output_dir}/{output_file}", "w") as f: + f.write(str(v)) + + +def main(*args, output_dir): + # Generate output files for each input argument + for i, arg in enumerate(args, start=1): + # Using i to generate filenames like 'a', 'b', 'c', ... + output_file = chr(ord("a") + i - 1) + write_output(output_dir, output_file, arg) + + +if __name__ == "__main__": + *inputs, output_dir = sys.argv[1:] # Unpack all inputs except for the last one for output_dir + main(*inputs, output_dir=output_dir) diff --git a/tests/flytekit/unit/core/test_container_task.py b/tests/flytekit/unit/core/test_container_task.py index c89ec11345..1281a9ec14 100644 --- a/tests/flytekit/unit/core/test_container_task.py +++ b/tests/flytekit/unit/core/test_container_task.py @@ -1,4 +1,7 @@ +import os +import sys from collections import OrderedDict +from typing import Tuple import pytest from kubernetes.client.models import ( @@ -13,7 +16,7 @@ V1Toleration, ) -from flytekit import kwtypes +from flytekit import kwtypes, task, workflow from flytekit.configuration import Image, ImageConfig, SerializationSettings from flytekit.core.container_task import ContainerTask from flytekit.core.pod_template import PodTemplate @@ -21,6 +24,75 @@ from flytekit.tools.translator import get_serializable_task +@pytest.mark.skipif( + sys.platform in ["darwin", "win32"], + reason="Skip if running on windows or macos due to CI Docker environment setup failure", +) +def test_local_execution(): + calculate_ellipse_area_python_template_style = ContainerTask( + name="calculate_ellipse_area_python_template_style", + input_data_dir="/var/inputs", + output_data_dir="/var/outputs", + inputs=kwtypes(a=float, b=float), + outputs=kwtypes(area=float, metadata=str), + image="ghcr.io/flyteorg/rawcontainers-python:v2", + command=[ + "python", + "calculate-ellipse-area.py", + "{{.inputs.a}}", + "{{.inputs.b}}", + "/var/outputs", + ], + ) + + area, metadata = calculate_ellipse_area_python_template_style(a=3.0, b=4.0) + assert isinstance(area, float) + assert isinstance(metadata, str) + + # Workflow execution with container task + @task + def t1(a: float, b: float) -> Tuple[float, float]: + return a + b, a * b + + @workflow + def wf(a: float, b: float) -> Tuple[float, str]: + a, b = t1(a=a, b=b) + area, metadata = calculate_ellipse_area_python_template_style(a=a, b=b) + return area, metadata + + area, metadata = wf(a=3.0, b=4.0) + assert isinstance(area, float) + assert isinstance(metadata, str) + + +@pytest.mark.skipif( + sys.platform == "win32", + reason="Skip if running on windows due to path error", +) +def test_local_execution_special_cases(): + # Boolean conversion from string checks + assert all([bool(s) for s in ["False", "false", "True", "true"]]) + + # Path normalization + input_data_dir = "/var/inputs" + assert os.path.normpath(input_data_dir) == "/var/inputs" + assert os.path.normpath(input_data_dir + "/") == "/var/inputs" + + # Datetime and timedelta string conversions + ct = ContainerTask( + name="local-execution", + image="test-image", + command="echo", + ) + + from datetime import datetime, timedelta + + now = datetime.now() + assert datetime.fromisoformat(str(now)) == now + td = timedelta(days=1, hours=1, minutes=1, seconds=1, microseconds=1) + assert td == ct._string_to_timedelta(str(td)) + + def test_pod_template(): ps = V1PodSpec( containers=[], tolerations=[V1Toleration(effect="NoSchedule", key="nvidia.com/gpu", operator="Exists")] @@ -86,19 +158,6 @@ def test_pod_template(): assert serialized_pod_spec["runtimeClassName"] == "nvidia" -def test_local_execution(): - ct = ContainerTask( - name="name", - input_data_dir="/var/inputs", - output_data_dir="/var/outputs", - image="inexistent-image:v42", - command=["some", "command"], - ) - - with pytest.raises(RuntimeError): - ct() - - def test_raw_container_with_image_spec(mock_image_spec_builder): ImageBuildEngine.register("test-raw-container", mock_image_spec_builder) image_spec = ImageSpec(registry="flyte", base_image="r-base", builder="test-raw-container") diff --git a/tests/flytekit/unit/core/test_local_raw_container.py b/tests/flytekit/unit/core/test_local_raw_container.py new file mode 100644 index 0000000000..c92d2b66be --- /dev/null +++ b/tests/flytekit/unit/core/test_local_raw_container.py @@ -0,0 +1,218 @@ +import datetime +import os +import sys +from pathlib import Path +from typing import Tuple + +import docker +import pytest + +import flytekit +from flytekit import ContainerTask, kwtypes, task, workflow +from flytekit.types.directory import FlyteDirectory +from flytekit.types.file import FlyteFile + + +@pytest.mark.skipif( + sys.platform in ["darwin", "win32"], + reason="Skip if running on windows or macos due to CI Docker environment setup failure", +) +def test_flytefile_wf(): + client = docker.from_env() + path_to_dockerfile = "tests/flytekit/unit/core/" + dockerfile_name = "Dockerfile.raw_container" + client.images.build(path=path_to_dockerfile, dockerfile=dockerfile_name, tag="flytekit:rawcontainer") + + flyte_file_io = ContainerTask( + name="flyte_file_io", + input_data_dir="/var/inputs", + output_data_dir="/var/outputs", + inputs=kwtypes(inputs=FlyteFile), + outputs=kwtypes(out=FlyteFile), + image="flytekit:rawcontainer", + command=[ + "python", + "write_flytefile.py", + "{{.inputs.inputs}}", + "/var/outputs/out", + ], + ) + + @task + def flyte_file_task() -> FlyteFile: + working_dir = flytekit.current_context().working_directory + write_file = os.path.join(working_dir, "flyte_file.txt") + with open(write_file, "w") as file: + file.write("This is flyte_file.txt file.") + return FlyteFile(path=write_file) + + @workflow + def flyte_file_io_wf() -> FlyteFile: + ff = flyte_file_task() + return flyte_file_io(inputs=ff) + + flytefile = flyte_file_io_wf() + assert isinstance(flytefile, FlyteFile) + + with open(flytefile.path, "r") as file: + content = file.read() + + assert content == "This is flyte_file.txt file." + + +@pytest.mark.skipif( + sys.platform in ["darwin", "win32"], + reason="Skip if running on windows or macos due to CI Docker environment setup failure", +) +def test_flytedir_wf(): + client = docker.from_env() + path_to_dockerfile = "tests/flytekit/unit/core/" + dockerfile_name = "Dockerfile.raw_container" + client.images.build(path=path_to_dockerfile, dockerfile=dockerfile_name, tag="flytekit:rawcontainer") + + flyte_dir_io = ContainerTask( + name="flyte_dir_io", + input_data_dir="/var/inputs", + output_data_dir="/var/outputs", + inputs=kwtypes(inputs=FlyteDirectory), + outputs=kwtypes(out=FlyteDirectory), + image="flytekit:rawcontainer", + command=[ + "python", + "write_flytedir.py", + "{{.inputs.inputs}}", + "/var/outputs/out", + ], + ) + + @task + def flyte_dir_task() -> FlyteDirectory: + working_dir = flytekit.current_context().working_directory + local_dir = Path(os.path.join(working_dir, "csv_files")) + local_dir.mkdir(exist_ok=True) + write_file = os.path.join(local_dir, "flyte_dir.txt") + with open(write_file, "w") as file: + file.write("This is for flyte dir.") + + return FlyteDirectory(path=str(local_dir)) + + @workflow + def flyte_dir_io_wf() -> FlyteDirectory: + fd = flyte_dir_task() + return flyte_dir_io(inputs=fd) + + flytyedir = flyte_dir_io_wf() + assert isinstance(flytyedir, FlyteDirectory) + + with open(os.path.join(flytyedir.path, "flyte_dir.txt"), "r") as file: + content = file.read() + + assert content == "This is for flyte dir." + + +@pytest.mark.skipif( + sys.platform in ["darwin", "win32"], + reason="Skip if running on windows or macos due to CI Docker environment setup failure", +) +def test_primitive_types_wf(): + client = docker.from_env() + path_to_dockerfile = "tests/flytekit/unit/core/" + dockerfile_name = "Dockerfile.raw_container" + client.images.build(path=path_to_dockerfile, dockerfile=dockerfile_name, tag="flytekit:rawcontainer") + + python_return_same_values = ContainerTask( + name="python_return_same_values", + input_data_dir="/var/inputs", + output_data_dir="/var/outputs", + inputs=kwtypes(a=int, b=bool, c=float, d=str, e=datetime.datetime, f=datetime.timedelta), + outputs=kwtypes(a=int, b=bool, c=float, d=str, e=datetime.datetime, f=datetime.timedelta), + image="flytekit:rawcontainer", + command=[ + "python", + "return_same_value.py", + "{{.inputs.a}}", + "{{.inputs.b}}", + "{{.inputs.c}}", + "{{.inputs.d}}", + "{{.inputs.e}}", + "{{.inputs.f}}", + "/var/outputs", + ], + ) + + @workflow + def primitive_types_io_wf( + a: int, b: bool, c: float, d: str, e: datetime.datetime, f: datetime.timedelta + ) -> Tuple[int, bool, float, str, datetime.datetime, datetime.timedelta]: + return python_return_same_values(a=a, b=b, c=c, d=d, e=e, f=f) + + now = datetime.datetime.now(tz=datetime.timezone.utc) + a, b, c, d, e, f = primitive_types_io_wf( + a=0, + b=False, + c=3.0, + d="hello", + e=now, + f=datetime.timedelta(days=1, hours=3, minutes=2, seconds=3, microseconds=5), + ) + + assert a == 0 + assert b is False + assert c == 3.0 + assert d == "hello" + assert e == now + assert f == datetime.timedelta(days=1, hours=3, minutes=2, seconds=3, microseconds=5) + + +@pytest.mark.skipif( + sys.platform in ["darwin", "win32"], + reason="Skip if running on windows or macos due to CI Docker environment setup failure", +) +def test_input_output_dir_manipulation(): + client = docker.from_env() + path_to_dockerfile = "tests/flytekit/unit/core/" + dockerfile_name = "Dockerfile.raw_container" + client.images.build(path=path_to_dockerfile, dockerfile=dockerfile_name, tag="flytekit:rawcontainer") + + python_return_same_values = ContainerTask( + name="python_return_same_values", + input_data_dir="/inputs", + output_data_dir="/outputs", + inputs=kwtypes(a=int, b=bool, c=float, d=str, e=datetime.datetime, f=datetime.timedelta), + outputs=kwtypes(a=int, b=bool, c=float, d=str, e=datetime.datetime, f=datetime.timedelta), + image="flytekit:rawcontainer", + command=[ + "python", + "return_same_value.py", + "{{.inputs.a}}", + "{{.inputs.b}}", + "{{.inputs.c}}", + "{{.inputs.d}}", + "{{.inputs.e}}", + "{{.inputs.f}}", + "/outputs", + ], + ) + + @workflow + def primitive_types_io_wf( + a: int, b: bool, c: float, d: str, e: datetime.datetime, f: datetime.timedelta + ) -> Tuple[int, bool, float, str, datetime.datetime, datetime.timedelta]: + return python_return_same_values(a=a, b=b, c=c, d=d, e=e, f=f) + + now = datetime.datetime.now(tz=datetime.timezone.utc) + a, b, c, d, e, f = primitive_types_io_wf( + a=0, + b=False, + c=3.0, + d="hello", + e=now, + f=datetime.timedelta(days=1, hours=3, minutes=2, seconds=3, microseconds=5), + ) + + assert a == 0 + assert b is False + assert c == 3.0 + assert d == "hello" + assert e == now + assert f == datetime.timedelta(days=1, hours=3, minutes=2, seconds=3, microseconds=5) diff --git a/tests/flytekit/unit/core/write_flytedir.py b/tests/flytekit/unit/core/write_flytedir.py new file mode 100644 index 0000000000..7cb5e0d77e --- /dev/null +++ b/tests/flytekit/unit/core/write_flytedir.py @@ -0,0 +1,24 @@ +import shutil +import sys +from pathlib import Path + + +def copy_directory(input_path: Path, output_path: Path): + if not input_path.exists() or not input_path.is_dir(): + print(f"Error: {input_path} does not exist or is not a directory.") + return + + try: + shutil.copytree(input_path, output_path) + print(f"Directory {input_path} was successfully copied to {output_path}") + except Exception as e: + print(f"Error copying {input_path} to {output_path}: {e}") + + +if __name__ == "__main__": + if len(sys.argv) > 2: + input_path = Path(sys.argv[1]) + output_path = Path(sys.argv[2]) + copy_directory(input_path, output_path) + else: + print("Usage: script.py ") diff --git a/tests/flytekit/unit/core/write_flytefile.py b/tests/flytekit/unit/core/write_flytefile.py new file mode 100644 index 0000000000..893c1700ce --- /dev/null +++ b/tests/flytekit/unit/core/write_flytefile.py @@ -0,0 +1,17 @@ +import sys +from pathlib import Path + + +def copy_content_to_output(input_path: Path, output_path: Path): + content = input_path.read_text() + + output_path.write_text(content) + + +if __name__ == "__main__": + if len(sys.argv) > 2: + input_path = Path(sys.argv[1]) + output_path = Path(sys.argv[2]) + copy_content_to_output(input_path, output_path) + else: + print("Usage: script.py ")