From dc08893c906b6b2c57a9ec859d5d6ab329af5b30 Mon Sep 17 00:00:00 2001 From: Antoine Tavant Date: Mon, 8 Jul 2024 17:39:57 +0200 Subject: [PATCH] DockerOperator TaskFlow - correct argyments in python command (#39620) --------- Co-authored-by: Antoine TAVANT --- airflow/providers/docker/decorators/docker.py | 2 +- .../docker/decorators/test_docker.py | 33 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/airflow/providers/docker/decorators/docker.py b/airflow/providers/docker/decorators/docker.py index 9aafdd1d79bfb..d851c98aca5d9 100644 --- a/airflow/providers/docker/decorators/docker.py +++ b/airflow/providers/docker/decorators/docker.py @@ -89,7 +89,7 @@ def generate_command(self): f"""bash -cx '{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py", self.python_command)} &&""" f'{_generate_decode_command("__PYTHON_INPUT", "/tmp/script.in", self.python_command)} &&' - f"{self.python_command} /tmp/script.py /tmp/script.in /tmp/script.out'" + f"{self.python_command} /tmp/script.py /tmp/script.in /tmp/script.out none /tmp/script.out'" ) def execute(self, context: Context): diff --git a/tests/providers/docker/decorators/test_docker.py b/tests/providers/docker/decorators/test_docker.py index e4fbe15fc3243..93db9f211b4db 100644 --- a/tests/providers/docker/decorators/test_docker.py +++ b/tests/providers/docker/decorators/test_docker.py @@ -16,6 +16,9 @@ # under the License. from __future__ import annotations +import logging +from io import StringIO as StringBuffer + import pytest from airflow.decorators import setup, task, teardown @@ -284,3 +287,33 @@ def f(): ret = f() assert ret.operator.docker_url == "unix://var/run/docker.sock" + + def test_failing_task(self, dag_maker): + """Test regression #39319 + + Check the log content of the DockerOperator when the task fails. + """ + + @task.docker(image="python:3.9-slim", auto_remove="force") + def f(): + raise ValueError("This task is expected to fail") + + docker_operator_logger_name = "airflow.task.operators" + + docker_operator_logger = logging.getLogger(docker_operator_logger_name) + log_capture_string = StringBuffer() + ch = logging.StreamHandler(log_capture_string) + docker_operator_logger.addHandler(ch) + with dag_maker(): + ret = f() + + dr = dag_maker.create_dagrun() + with pytest.raises(AirflowException): + ret.operator.run(start_date=dr.execution_date, end_date=dr.execution_date) + ti = dr.get_task_instances()[0] + assert ti.state == TaskInstanceState.FAILED + + log_content = str(log_capture_string.getvalue()) + assert 'with open(sys.argv[4], "w") as file:' not in log_content + last_line_of_docker_operator_log = log_content.splitlines()[-1] + assert "ValueError: This task is expected to fail" in last_line_of_docker_operator_log