Skip to content

Commit

Permalink
Restore function scoped httpx import in file_task_handler for perf (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
vchiapaikeo authored Jan 12, 2024
1 parent fd68f05 commit c792b25
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from typing import TYPE_CHECKING, Any, Callable, Iterable
from urllib.parse import urljoin

import httpx
import pendulum

from airflow.configuration import conf
Expand Down Expand Up @@ -79,6 +78,9 @@ def _set_task_deferred_context_var():


def _fetch_logs_from_service(url, log_relative_path):
# Import occurs in function scope for perf. Ref: https://github.com/apache/airflow/pull/21438
import httpx

from airflow.utils.jwt_signer import JWTSigner

timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None)
Expand Down Expand Up @@ -557,7 +559,9 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li
messages.append(f"Found logs served from host {url}")
logs.append(response.text)
except Exception as e:
if isinstance(e, httpx.UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True:
from httpx import UnsupportedProtocol

if isinstance(e, UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True:
messages.append(self.inherits_from_empty_operator_log_message)
else:
messages.append(f"Could not read served logs: {e}")
Expand Down

0 comments on commit c792b25

Please sign in to comment.