Skip to content

Commit

Permalink
Refactor implementation to get OTel tracer (apache#44216)
Browse files Browse the repository at this point in the history
Previous implementation had redundant logic in add_span for parameter inspection:

```python
with Trace.start_span(span_name=func_name, component=component):
    if len(inspect.signature(func).parameters) > 0:
        return func(*args, **kwargs)
    else:
        return func()
```

`_Trace` Metaclass was using `__init__` and few more Python-related changes
  • Loading branch information
kaxil authored Nov 20, 2024
1 parent 8554001 commit d43052e
Showing 1 changed file with 43 additions and 35 deletions.
78 changes: 43 additions & 35 deletions airflow/traces/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
# under the License.
from __future__ import annotations

import inspect
import logging
import socket
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable

from airflow.configuration import conf
Expand All @@ -44,29 +44,23 @@ def gen_links_from_kv_list(list):

def add_span(func):
"""Decorate a function with span."""
func_name = func.__name__
qual_name = func.__qualname__
module_name = func.__module__
component = qual_name.rsplit(".", 1)[0] if "." in qual_name else module_name

@wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
qual_name = func.__qualname__
module_name = func.__module__
if "." in qual_name:
component = f"{qual_name.rsplit('.', 1)[0]}"
else:
component = module_name
with Trace.start_span(span_name=func_name, component=component):
if len(inspect.signature(func).parameters) > 0:
return func(*args, **kwargs)
else:
return func()
return func(*args, **kwargs)

return wrapper


class EmptyContext:
"""If no Tracer is configured, EmptyContext is used as a fallback."""

def __init__(self):
self.trace_id = 1
trace_id = 1


class EmptySpan:
Expand Down Expand Up @@ -243,41 +237,55 @@ def start_span_from_taskinstance(
return EMPTY_SPAN


class _Trace(type):
factory: Callable
class _TraceMeta(type):
factory: Callable[[], Tracer] | None = None
instance: Tracer | EmptyTrace | None = None

def __getattr__(cls, name: str) -> str:
def __getattr__(cls, name: str):
if not cls.factory:
# Lazy initialization of the factory
cls.configure_factory()
if not cls.instance:
try:
cls.instance = cls.factory()
except (socket.gaierror, ImportError) as e:
log.error("Could not configure Trace: %s, using EmptyTrace instead.", e)
cls.instance = EmptyTrace()
cls._initialize_instance()
return getattr(cls.instance, name)

def __init__(cls, *args, **kwargs) -> None:
super().__init__(cls)
if not hasattr(cls.__class__, "factory"):
if conf.has_option("traces", "otel_on") and conf.getboolean("traces", "otel_on"):
from airflow.traces import otel_tracer
def _initialize_instance(cls):
"""Initialize the trace instance."""
try:
cls.instance = cls.factory()
except (socket.gaierror, ImportError) as e:
log.error("Could not configure Trace: %s. Using EmptyTrace instead.", e)
cls.instance = EmptyTrace()

def __call__(cls, *args, **kwargs):
"""Ensure the class behaves as a singleton."""
if not cls.instance:
cls._initialize_instance()
return cls.instance

@classmethod
def configure_factory(cls):
"""Configure the trace factory based on settings."""
if conf.has_option("traces", "otel_on") and conf.getboolean("traces", "otel_on"):
from airflow.traces import otel_tracer

cls.__class__.factory = otel_tracer.get_otel_tracer
else:
cls.__class__.factory = EmptyTrace
cls.factory = otel_tracer.get_otel_tracer
else:
# EmptyTrace is a class and not inherently callable.
# Using a lambda ensures it can be invoked as a callable factory.
# staticmethod ensures the lambda is treated as a standalone function
# and avoids passing `cls` as an implicit argument.
cls.factory = staticmethod(lambda: EmptyTrace())

@classmethod
def get_constant_tags(cls) -> str | None:
"""Get constant tags to add to all traces."""
tags_in_string = conf.get("traces", "tags", fallback=None)
if not tags_in_string:
return None
return tags_in_string
return conf.get("traces", "tags", fallback=None)


if TYPE_CHECKING:
Trace: EmptyTrace
else:

class Trace(metaclass=_Trace):
class Trace(metaclass=_TraceMeta):
"""Empty class for Trace - we use metaclass to inject the right one."""

0 comments on commit d43052e

Please sign in to comment.