Skip to content

Commit

Permalink
DockerOperator TaskFlow - correct argyments in python command (apache…
Browse files Browse the repository at this point in the history
…#39620)


---------

Co-authored-by: Antoine TAVANT <[email protected]>
  • Loading branch information
antoinetavant and Antoine TAVANT authored Jul 8, 2024
1 parent 18d2d14 commit dc08893
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/providers/docker/decorators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def generate_command(self):
f"""bash -cx '{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py",
self.python_command)} &&"""
f'{_generate_decode_command("__PYTHON_INPUT", "/tmp/script.in", self.python_command)} &&'
f"{self.python_command} /tmp/script.py /tmp/script.in /tmp/script.out'"
f"{self.python_command} /tmp/script.py /tmp/script.in /tmp/script.out none /tmp/script.out'"
)

def execute(self, context: Context):
Expand Down
33 changes: 33 additions & 0 deletions tests/providers/docker/decorators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# under the License.
from __future__ import annotations

import logging
from io import StringIO as StringBuffer

import pytest

from airflow.decorators import setup, task, teardown
Expand Down Expand Up @@ -284,3 +287,33 @@ def f():
ret = f()

assert ret.operator.docker_url == "unix://var/run/docker.sock"

def test_failing_task(self, dag_maker):
"""Test regression #39319
Check the log content of the DockerOperator when the task fails.
"""

@task.docker(image="python:3.9-slim", auto_remove="force")
def f():
raise ValueError("This task is expected to fail")

docker_operator_logger_name = "airflow.task.operators"

docker_operator_logger = logging.getLogger(docker_operator_logger_name)
log_capture_string = StringBuffer()
ch = logging.StreamHandler(log_capture_string)
docker_operator_logger.addHandler(ch)
with dag_maker():
ret = f()

dr = dag_maker.create_dagrun()
with pytest.raises(AirflowException):
ret.operator.run(start_date=dr.execution_date, end_date=dr.execution_date)
ti = dr.get_task_instances()[0]
assert ti.state == TaskInstanceState.FAILED

log_content = str(log_capture_string.getvalue())
assert 'with open(sys.argv[4], "w") as file:' not in log_content
last_line_of_docker_operator_log = log_content.splitlines()[-1]
assert "ValueError: This task is expected to fail" in last_line_of_docker_operator_log

0 comments on commit dc08893

Please sign in to comment.