diff --git a/airflow/traces/tracer.py b/airflow/traces/tracer.py index 79272d43d00bb..f116b02ef3b9b 100644 --- a/airflow/traces/tracer.py +++ b/airflow/traces/tracer.py @@ -17,9 +17,9 @@ # under the License. from __future__ import annotations -import inspect import logging import socket +from functools import wraps from typing import TYPE_CHECKING, Any, Callable from airflow.configuration import conf @@ -44,20 +44,15 @@ def gen_links_from_kv_list(list): def add_span(func): """Decorate a function with span.""" + func_name = func.__name__ + qual_name = func.__qualname__ + module_name = func.__module__ + component = qual_name.rsplit(".", 1)[0] if "." in qual_name else module_name + @wraps(func) def wrapper(*args, **kwargs): - func_name = func.__name__ - qual_name = func.__qualname__ - module_name = func.__module__ - if "." in qual_name: - component = f"{qual_name.rsplit('.', 1)[0]}" - else: - component = module_name with Trace.start_span(span_name=func_name, component=component): - if len(inspect.signature(func).parameters) > 0: - return func(*args, **kwargs) - else: - return func() + return func(*args, **kwargs) return wrapper @@ -65,8 +60,7 @@ def wrapper(*args, **kwargs): class EmptyContext: """If no Tracer is configured, EmptyContext is used as a fallback.""" - def __init__(self): - self.trace_id = 1 + trace_id = 1 class EmptySpan: @@ -243,41 +237,55 @@ def start_span_from_taskinstance( return EMPTY_SPAN -class _Trace(type): - factory: Callable +class _TraceMeta(type): + factory: Callable[[], Tracer] | None = None instance: Tracer | EmptyTrace | None = None - def __getattr__(cls, name: str) -> str: + def __getattr__(cls, name: str): + if not cls.factory: + # Lazy initialization of the factory + cls.configure_factory() if not cls.instance: - try: - cls.instance = cls.factory() - except (socket.gaierror, ImportError) as e: - log.error("Could not configure Trace: %s, using EmptyTrace instead.", e) - cls.instance = EmptyTrace() + cls._initialize_instance() return getattr(cls.instance, name) - def __init__(cls, *args, **kwargs) -> None: - super().__init__(cls) - if not hasattr(cls.__class__, "factory"): - if conf.has_option("traces", "otel_on") and conf.getboolean("traces", "otel_on"): - from airflow.traces import otel_tracer + def _initialize_instance(cls): + """Initialize the trace instance.""" + try: + cls.instance = cls.factory() + except (socket.gaierror, ImportError) as e: + log.error("Could not configure Trace: %s. Using EmptyTrace instead.", e) + cls.instance = EmptyTrace() + + def __call__(cls, *args, **kwargs): + """Ensure the class behaves as a singleton.""" + if not cls.instance: + cls._initialize_instance() + return cls.instance + + @classmethod + def configure_factory(cls): + """Configure the trace factory based on settings.""" + if conf.has_option("traces", "otel_on") and conf.getboolean("traces", "otel_on"): + from airflow.traces import otel_tracer - cls.__class__.factory = otel_tracer.get_otel_tracer - else: - cls.__class__.factory = EmptyTrace + cls.factory = otel_tracer.get_otel_tracer + else: + # EmptyTrace is a class and not inherently callable. + # Using a lambda ensures it can be invoked as a callable factory. + # staticmethod ensures the lambda is treated as a standalone function + # and avoids passing `cls` as an implicit argument. + cls.factory = staticmethod(lambda: EmptyTrace()) @classmethod def get_constant_tags(cls) -> str | None: """Get constant tags to add to all traces.""" - tags_in_string = conf.get("traces", "tags", fallback=None) - if not tags_in_string: - return None - return tags_in_string + return conf.get("traces", "tags", fallback=None) if TYPE_CHECKING: Trace: EmptyTrace else: - class Trace(metaclass=_Trace): + class Trace(metaclass=_TraceMeta): """Empty class for Trace - we use metaclass to inject the right one."""