Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FT-864 Add logic for open telementry logging. #459

Merged
merged 13 commits into from
Dec 31, 2024
102 changes: 101 additions & 1 deletion pyatlan/pkg/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
import os
from typing import Dict, List
from typing import Any, Dict, List, Mapping, Optional, Sequence, Union

from pydantic.v1 import parse_obj_as, parse_raw_as

Expand All @@ -12,6 +12,56 @@

LOGGER = logging.getLogger(__name__)

# Try to import OpenTelemetry libraries
try:
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( # type:ignore
OTLPLogExporter,
)
from opentelemetry.sdk._logs import ( # type:ignore
LogData,
LoggerProvider,
LoggingHandler,
)
from opentelemetry.sdk._logs._internal.export import ( # type:ignore
BatchLogRecordProcessor,
)
from opentelemetry.sdk.resources import Resource # type:ignore
ErnestoLoma marked this conversation as resolved.
Show resolved Hide resolved

class CustomBatchLogRecordProcessor(BatchLogRecordProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def emit(self, log_data: LogData) -> None:
if not self._is_valid_type(log_data.log_record.body):
log_data.log_record.body = str(log_data.log_record.body)
super().emit(log_data)

def _is_valid_type(self, value: Any) -> bool:
# see https://github.com/open-telemetry/opentelemetry-python/blob/c883f6cc1243ab7e0e5bc177169f25cdf0aac29f/
# exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal
# /__init__.py#L69
# for valid encode types
if isinstance(value, bool):
return True
if isinstance(value, str):
return True
if isinstance(value, int):
return True
if isinstance(value, float):
return True
if isinstance(value, Sequence):
return all(self._is_valid_type(v) for v in value)
elif isinstance(value, Mapping):
return all(
self._is_valid_type(k) & self._is_valid_type(v)
for k, v in value.items()
)
return False

OTEL_IMPORTS_AVAILABLE = True
except ImportError:
OTEL_IMPORTS_AVAILABLE = False


def get_client(impersonate_user_id: str) -> AtlanClient:
"""
Expand Down Expand Up @@ -117,3 +167,53 @@ def validate_connector_and_connection(v):
from pyatlan.pkg.models import ConnectorAndConnection

return parse_raw_as(ConnectorAndConnection, v)


def has_handler(logger: logging.Logger, handler_class) -> bool:
"""
Checks if a logger or its ancestor has a handler of a specific class. The function
iterates through the logger's handlers and optionally ascends the logger hierarchy,
checking each logger's handlers for an instance of the specified handler class.

Args:
logger (logging.Logger): The logger to inspect for the handler.
handler_class: The class of the handler to look for.

Returns:
bool: True if the handler of the specified class is found, False otherwise.
"""
c: Optional[logging.Logger] = logger
while c:
for hdlr in c.handlers:
if isinstance(hdlr, handler_class):
return True
c = c.parent if c.propagate else None
return False


def add_otel_handler(
logger: logging.Logger, level: Union[int, str], resource: dict
) -> None:
"""
Adds an OpenTelemetry handler to the logger if not already present.

Args:
logger (logging.Logger): The logger to which the handler will be added.
level (int | str): The logging level.
resource (dict): A dictionary of resource attributes to be associated with the logger.
"""
if OTEL_IMPORTS_AVAILABLE and not has_handler(logger, LoggingHandler):
if workflow_node_name := os.getenv("OTEL_WF_NODE_NAME", ""):
resource["k8s.workflow.node.name"] = workflow_node_name
logger_provider = LoggerProvider(Resource.create(resource))
otel_log_exporter = OTLPLogExporter(
endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"), insecure=True
)
logger_provider.add_log_record_processor(
CustomBatchLogRecordProcessor(otel_log_exporter)
)

otel_handler = LoggingHandler(level=level, logger_provider=logger_provider)
otel_handler.setLevel(level)
logger.addHandler(otel_handler)
logger.info("OpenTelemetry handler added to the logger.")
Loading