Skip to content

Commit

Permalink
Make task output "unbuffered" so output is captured straight away (ap…
Browse files Browse the repository at this point in the history
…ache#44186)

Without this change a dag like this:

```
@task()
def hello():
    print("hello")
    time.sleep(300)
    print("goodbye")
```

would not show the output for "hello" until after the sleep!

This is analogouys to setting PYTHONUNBUFFERED environment variable when
running something like `python script.py | cat` etc.
  • Loading branch information
ashb authored Nov 19, 2024
1 parent 87337da commit 24811f7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 16 deletions.
5 changes: 3 additions & 2 deletions task_sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ def _reopen_std_io_handles(child_stdin, child_stdout, child_stderr):
fd = sock.fileno()
else:
raise

setattr(sys, handle_name, os.fdopen(fd, mode))
# We can't open text mode fully unbuffered (python throws an exception if we try), but we can make it line buffered with `buffering=1`
handle = os.fdopen(fd, mode, buffering=1)
setattr(sys, handle_name, handle)


def _fork_main(
Expand Down
28 changes: 14 additions & 14 deletions task_sdk/tests/execution_time/test_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,30 @@ def test_reading_from_pipes(self, captured_logs, time_machine):
# Ignore anything lower than INFO for this test. Captured_logs resets things for us afterwards
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.INFO))

line = lineno()

def subprocess_main():
# This is run in the subprocess!

# Ensure we follow the "protocol" and get the startup message before we do anything
sys.stdin.readline()

# Flush calls are to ensure ordering of output for predictable tests
import logging
import warnings

print("I'm a short message")
sys.stdout.write("Message ")
sys.stdout.write("split across two writes\n")
sys.stdout.flush()

print("stderr message", file=sys.stderr)
sys.stderr.flush()
# We need a short sleep for the main process to process things. I worry this timining will be
# fragile, but I can't think of a better way. This lets the stdout be read (partial line) and the
# stderr full line be read
sleep(0.1)
sys.stdout.write("split across two writes\n")

logging.getLogger("airflow.foobar").error("An error message")

warnings.warn("Warning should be captured too", stacklevel=1)

line = lineno() - 2 # Line the error should be on

instant = tz.datetime(2024, 11, 7, 12, 34, 56, 78901)
time_machine.move_to(instant, tick=False)

Expand Down Expand Up @@ -103,16 +103,16 @@ def subprocess_main():
"timestamp": "2024-11-07T12:34:56.078901Z",
},
{
"chan": "stdout",
"event": "Message split across two writes",
"level": "info",
"chan": "stderr",
"event": "stderr message",
"level": "error",
"logger": "task",
"timestamp": "2024-11-07T12:34:56.078901Z",
},
{
"chan": "stderr",
"event": "stderr message",
"level": "error",
"chan": "stdout",
"event": "Message split across two writes",
"level": "info",
"logger": "task",
"timestamp": "2024-11-07T12:34:56.078901Z",
},
Expand All @@ -127,7 +127,7 @@ def subprocess_main():
"event": "Warning should be captured too",
"filename": __file__,
"level": "warning",
"lineno": line + 22,
"lineno": line,
"logger": "py.warnings",
"timestamp": instant.replace(tzinfo=None),
},
Expand Down

0 comments on commit 24811f7

Please sign in to comment.