Skip to content

Commit

Permalink
ShellTask: stores process return code, stdout and stderr for later us…
Browse files Browse the repository at this point in the history
…e. (flyteorg#2229)

Signed-off-by: Benoist LAURENT <[email protected]>
  • Loading branch information
benoistlaurent authored Mar 10, 2024
1 parent d5bc878 commit 96f2b2a
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 16 deletions.
46 changes: 33 additions & 13 deletions flytekit/extras/tasks/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import subprocess
import typing
from dataclasses import dataclass
from typing import List, Tuple
from typing import List

import flytekit
from flytekit.core.context_manager import ExecutionParameters
Expand All @@ -18,6 +18,21 @@
from flytekit.types.file import FlyteFile


@dataclass
class ProcessResult:
"""Stores a process return code, standard output and standard error.
Args:
returncode: int The sub-process return code
output: str The sub-process standard output string
error: str The sub-process standard error string
"""

returncode: int
output: str
error: str


@dataclass
class OutputLocation:
"""
Expand All @@ -34,7 +49,7 @@ class OutputLocation:
location: typing.Union[os.PathLike, str]


def subproc_execute(command: typing.Union[List[str], str], **kwargs) -> Tuple[str, str]:
def subproc_execute(command: typing.Union[List[str], str], **kwargs) -> ProcessResult:
"""
Execute a command and capture its stdout and stderr. Useful for executing
shell commands from within a python task.
Expand All @@ -43,7 +58,7 @@ def subproc_execute(command: typing.Union[List[str], str], **kwargs) -> Tuple[st
command (List[str]): The command to be executed as a list of strings.
Returns:
Tuple[str, str]: A tuple containing the stdout and stderr output of the command.
ProcessResult: Structure containing output of the command.
Raises:
Exception: If the command execution fails, this exception is raised with
Expand All @@ -66,7 +81,7 @@ def subproc_execute(command: typing.Union[List[str], str], **kwargs) -> Tuple[st
result = subprocess.run(command, **kwargs)

# Access the stdout and stderr output
return result.stdout, result.stderr
return ProcessResult(result.returncode, result.stdout, result.stderr)

except subprocess.CalledProcessError as e:
raise Exception(f"Command: {e.cmd}\nFailed with return code {e.returncode}:\n{e.stderr}")
Expand Down Expand Up @@ -145,7 +160,7 @@ def interpolate(
T = typing.TypeVar("T")


def _run_script(script: str, shell: str) -> typing.Tuple[int, str, str]:
def _run_script(script: str, shell: str) -> ProcessResult:
"""
Run script as a subprocess and return the returncode, stdout, and stderr.
Expand All @@ -156,8 +171,8 @@ def _run_script(script: str, shell: str) -> typing.Tuple[int, str, str]:
:type script: str
:param shell: shell to use to run the script
:type shell: str
:return: tuple containing the process returncode, stdout, and stderr
:rtype: typing.Tuple[int, str, str]
:return: structure containing the process returncode, stdout (stripped from carriage returns), and stderr
:rtype: ProcessResult
"""
process = subprocess.Popen(
script,
Expand All @@ -176,7 +191,7 @@ def _run_script(script: str, shell: str) -> typing.Tuple[int, str, str]:
out += line

code = process.wait()
return code, out, process_stderr
return ProcessResult(code, out, process_stderr)


class ShellTask(PythonInstanceTask[T]):
Expand Down Expand Up @@ -239,6 +254,7 @@ def __init__(
self._output_locs = output_locs if output_locs else []
self._interpolizer = _PythonFStringInterpolizer()
outputs = self._validate_output_locs()
self._process_result: typing.Optional[ProcessResult] = None
super().__init__(
name,
task_config,
Expand All @@ -263,6 +279,10 @@ def _validate_output_locs(self) -> typing.Dict[str, typing.Type]:
outputs[v.var] = v.var_type
return outputs

@property
def result(self) -> typing.Optional[ProcessResult]:
return self._process_result

@property
def script(self) -> typing.Optional[str]:
return self._script
Expand Down Expand Up @@ -303,15 +323,15 @@ def execute(self, **kwargs) -> typing.Any:
os.environ["ComSpec"] = "C:\\Windows\\System32\\cmd.exe"
self._shell = os.environ["ComSpec"]

returncode, stdout, stderr = _run_script(gen_script, self._shell)
if returncode != 0:
self._process_result = _run_script(gen_script, self._shell)
if self._process_result.returncode != 0:
files = os.listdir(".")
fstr = "\n-".join(files)
error = (
f"Failed to Execute Script, return-code {returncode} \n"
f"Failed to Execute Script, return-code {self._process_result.returncode} \n"
f"Current directory contents: .\n-{fstr}\n"
f"StdOut: {stdout}\n"
f"StdErr: {stderr}\n"
f"StdOut: {self._process_result.output}\n"
f"StdErr: {self._process_result.error}\n"
)
logger.error(error)
# raise FlyteRecoverableException so that it's classified as user error and will be retried
Expand Down
22 changes: 19 additions & 3 deletions tests/flytekit/unit/extras/tasks/test_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@
script_sh_2 = os.path.join(testdata, "script_args_env.sh")


def test_shell_task_access_to_result():
t = ShellTask(
name="test",
script="""
echo "Hello World!"
""",
shell="/bin/bash",
)
t()

assert t.result.returncode == 0
assert t.result.output == "Hello World!" # ShellTask strips carriage returns
assert t.result.error == ""


def test_shell_task_no_io():
t = ShellTask(
name="test",
Expand Down Expand Up @@ -342,9 +357,10 @@ def test_long_run_script():

def test_subproc_execute():
cmd = ["echo", "hello"]
o, e = subproc_execute(cmd)
assert o == "hello\n"
assert e == ""
result = subproc_execute(cmd)
assert result.returncode == 0
assert result.output == "hello\n"
assert result.error == ""


def test_subproc_execute_with_shell():
Expand Down

0 comments on commit 96f2b2a

Please sign in to comment.