diff --git a/pyproject.toml b/pyproject.toml index 1bb65b0fade..8c8e36192e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,8 @@ dependencies = [ "openpyxl", # extra dependency for pandas (excel) "opentelemetry-api", "opentelemetry-sdk", + "opentelemetry.instrumentation.fastapi", + "opentelemetry-instrumentation-httpx", "opentelemetry-instrumentation-threading", "orjson", "packaging", diff --git a/src/ert/dark_storage/app.py b/src/ert/dark_storage/app.py index 03bcf7ef9fc..95b2e402a57 100644 --- a/src/ert/dark_storage/app.py +++ b/src/ert/dark_storage/app.py @@ -6,6 +6,7 @@ from fastapi import FastAPI, Request, status from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html from fastapi.responses import HTMLResponse, RedirectResponse, Response +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from ert.dark_storage.endpoints import router as endpoints_router from ert.dark_storage.exceptions import ErtStorageError @@ -107,3 +108,5 @@ async def healthcheck() -> str: app.include_router(endpoints_router) + +FastAPIInstrumentor.instrument_app(app) diff --git a/src/ert/logging/storage_log.conf b/src/ert/logging/storage_log.conf index d850e528a49..68affbf98d3 100644 --- a/src/ert/logging/storage_log.conf +++ b/src/ert/logging/storage_log.conf @@ -32,7 +32,7 @@ loggers: ert.shared.storage.info: level: INFO handlers: [infohandler, file] - propagate: False + propagate: True ert.shared.status: level: INFO res: diff --git a/src/ert/services/_storage_main.py b/src/ert/services/_storage_main.py index 9434a2babf4..14b1626b560 100644 --- a/src/ert/services/_storage_main.py +++ b/src/ert/services/_storage_main.py @@ -7,13 +7,16 @@ import signal import socket import string +import sys import threading import time import warnings +from types import FrameType from typing import Any import uvicorn import yaml +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from uvicorn.supervisors import ChangeReload from ert.logging import STORAGE_LOG_CONFIG @@ -21,6 +24,9 @@ from ert.shared import __file__ as ert_shared_path from ert.shared import find_available_socket from ert.shared.storage.command import add_parser_options +from ert.trace import tracer, tracer_provider + +DARK_STORAGE_APP = "ert.dark_storage.app:app" class Server(uvicorn.Server): @@ -81,7 +87,11 @@ def _create_connection_info(sock: socket.socket, authtoken: str) -> dict[str, An return connection_info -def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> None: +def run_server( + args: argparse.Namespace | None = None, + debug: bool = False, + uvicorn_config: uvicorn.Config | None = None, +) -> None: if args is None: args = parse_args() @@ -102,12 +112,16 @@ def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> N # Appropriated from uvicorn.main:run os.environ["ERT_STORAGE_NO_TOKEN"] = "1" os.environ["ERT_STORAGE_ENS_PATH"] = os.path.abspath(args.project) - config = uvicorn.Config("ert.dark_storage.app:app", **config_args) + config = ( + uvicorn.Config(DARK_STORAGE_APP, **config_args) + if uvicorn_config is None + else uvicorn_config + ) # uvicorn.Config() resets the logging config (overriding additional handlers added to loggers like e.g. the ert_azurelogger handler added through the pluggin system server = Server(config, json.dumps(connection_info)) logger = logging.getLogger("ert.shared.storage.info") - log_level = logging.INFO if args.verbose else logging.WARNING - logger.setLevel(log_level) + if args.verbose and logger.level > logging.INFO: + logger.setLevel(logging.INFO) logger.info("Storage server is ready to accept requests. Listening on:") for url in connection_info["urls"]: logger.info(f" {url}") @@ -143,21 +157,53 @@ def check_parent_alive() -> bool: os.kill(os.getpid(), signal.SIGTERM) -if __name__ == "__main__": +def main() -> None: + args = parse_args() + config_args: dict[str, Any] = {} with open(STORAGE_LOG_CONFIG, encoding="utf-8") as conf_file: logging_conf = yaml.safe_load(conf_file) logging.config.dictConfig(logging_conf) + config_args.update(log_config=logging_conf) warnings.filterwarnings("ignore", category=DeprecationWarning) - uvicorn.config.LOGGING_CONFIG.clear() - uvicorn.config.LOGGING_CONFIG.update(logging_conf) - _stopped = threading.Event() + + if args.debug: + config_args.update(reload=True, reload_dirs=[os.path.dirname(ert_shared_path)]) + uvicorn_config = uvicorn.Config( + DARK_STORAGE_APP, **config_args + ) # Need to run uvicorn.Config before entering the ErtPluginContext because uvicorn.Config overrides the configuration of existing loggers, thus removing log handlers added by ErtPluginContext + + ctx = ( + TraceContextTextMapPropagator().extract( + carrier={"traceparent": args.traceparent} + ) + if args.traceparent + else None + ) + + stopped = threading.Event() terminate_on_parent_death_thread = threading.Thread( - target=terminate_on_parent_death, args=[_stopped, 1.0] + target=terminate_on_parent_death, args=[stopped, 1.0] ) - with ErtPluginContext(logger=logging.getLogger()) as context: + with ErtPluginContext(logger=logging.getLogger(), trace_provider=tracer_provider): terminate_on_parent_death_thread.start() - try: - run_server(debug=False) - finally: - _stopped.set() - terminate_on_parent_death_thread.join() + with tracer.start_as_current_span("run_storage_server", ctx): + logger = logging.getLogger("ert.shared.storage.info") + try: + logger.info("Starting dark storage") + run_server(args, debug=False, uvicorn_config=uvicorn_config) + except SystemExit: + logger.info("Stopping dark storage") + finally: + stopped.set() + terminate_on_parent_death_thread.join() + + +def sigterm_handler(_signo: int, _stack_frame: FrameType | None) -> None: + sys.exit(0) + + +signal.signal(signal.SIGTERM, sigterm_handler) + + +if __name__ == "__main__": + main() diff --git a/src/ert/services/storage_service.py b/src/ert/services/storage_service.py index 734ecce5c91..10a6b4c7018 100644 --- a/src/ert/services/storage_service.py +++ b/src/ert/services/storage_service.py @@ -6,9 +6,13 @@ import httpx import requests +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from ert.dark_storage.client import Client, ConnInfo from ert.services._base_service import BaseService, _Context, local_exec_args +from ert.trace import get_traceparent + +HTTPXClientInstrumentor().instrument() class StorageService(BaseService): @@ -21,6 +25,7 @@ def __init__( conn_info: Mapping[str, Any] | Exception | None = None, project: str | None = None, verbose: bool = False, + traceparent: str | None = "inherit_parent", ): self._url: str | None = None @@ -29,6 +34,11 @@ def __init__( exec_args.extend(["--project", str(project)]) if verbose: exec_args.append("--verbose") + if traceparent: + traceparent = ( + get_traceparent() if traceparent == "inherit_parent" else traceparent + ) + exec_args.extend(["--traceparent", str(traceparent)]) super().__init__(exec_args, timeout, conn_info, project) diff --git a/src/ert/shared/storage/command.py b/src/ert/shared/storage/command.py index 05bb63e9fbb..73be6dcd75b 100644 --- a/src/ert/shared/storage/command.py +++ b/src/ert/shared/storage/command.py @@ -16,6 +16,12 @@ def add_parser_options(ap: ArgumentParser) -> None: help="Path to directory in which to create storage_server.json", default=os.getcwd(), ) + ap.add_argument( + "--traceparent", + type=str, + help="Trace parent id to be used by the storage root span", + default=None, + ) ap.add_argument( "--host", type=str, default=os.environ.get("ERT_STORAGE_HOST", "127.0.0.1") ) diff --git a/src/ert/trace.py b/src/ert/trace.py index c7a24b48b1c..8e7b6fcc70a 100644 --- a/src/ert/trace.py +++ b/src/ert/trace.py @@ -1,6 +1,7 @@ from opentelemetry import trace from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import SpanLimits, TracerProvider +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator resource = Resource(attributes={SERVICE_NAME: "ert"}) tracer_provider = TracerProvider( @@ -13,3 +14,10 @@ def get_trace_id() -> str: return trace.format_trace_id(trace.get_current_span().get_span_context().trace_id) + + +def get_traceparent() -> str | None: + carrier: dict[str, str] = {} + # Write the current context into the carrier. + TraceContextTextMapPropagator().inject(carrier) + return carrier.get("traceparent")