From dfe08a7105e2177df73f7eefdd7fd2e02a5c4ff9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 18 Dec 2024 15:08:57 +0100 Subject: [PATCH 1/5] WIP: Add spans to dark storage Add opentelemetry.instrumentation on server and client side Fix azure log handler overriden by uvicorn.Config --- pyproject.toml | 2 ++ src/ert/dark_storage/app.py | 4 +++ src/ert/services/_storage_main.py | 53 ++++++++++++++++++++++------- src/ert/services/storage_service.py | 2 ++ 4 files changed, 49 insertions(+), 12 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1bb65b0fade..8c8e36192e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,8 @@ dependencies = [ "openpyxl", # extra dependency for pandas (excel) "opentelemetry-api", "opentelemetry-sdk", + "opentelemetry.instrumentation.fastapi", + "opentelemetry-instrumentation-httpx", "opentelemetry-instrumentation-threading", "orjson", "packaging", diff --git a/src/ert/dark_storage/app.py b/src/ert/dark_storage/app.py index 03bcf7ef9fc..760564620e3 100644 --- a/src/ert/dark_storage/app.py +++ b/src/ert/dark_storage/app.py @@ -10,6 +10,8 @@ from ert.dark_storage.endpoints import router as endpoints_router from ert.dark_storage.exceptions import ErtStorageError +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + class JSONEncoder(json.JSONEncoder): """ @@ -107,3 +109,5 @@ async def healthcheck() -> str: app.include_router(endpoints_router) + +FastAPIInstrumentor.instrument_app(app) \ No newline at end of file diff --git a/src/ert/services/_storage_main.py b/src/ert/services/_storage_main.py index 9434a2babf4..b21d56022fc 100644 --- a/src/ert/services/_storage_main.py +++ b/src/ert/services/_storage_main.py @@ -7,6 +7,7 @@ import signal import socket import string +import sys import threading import time import warnings @@ -21,6 +22,11 @@ from ert.shared import __file__ as ert_shared_path from ert.shared import find_available_socket from ert.shared.storage.command import add_parser_options +from ert.trace import get_trace_id, trace, tracer, tracer_provider + +from opentelemetry.trace.span import Span + +DARK_STORAGE_APP = "ert.dark_storage.app:app" class Server(uvicorn.Server): @@ -80,8 +86,8 @@ def _create_connection_info(sock: socket.socket, authtoken: str) -> dict[str, An return connection_info - -def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> None: +def run_server(args: argparse.Namespace | None = None, debug: bool = False, uvicorn_config = None) -> None: + trace_id = get_trace_id() if args is None: args = parse_args() @@ -102,7 +108,7 @@ def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> N # Appropriated from uvicorn.main:run os.environ["ERT_STORAGE_NO_TOKEN"] = "1" os.environ["ERT_STORAGE_ENS_PATH"] = os.path.abspath(args.project) - config = uvicorn.Config("ert.dark_storage.app:app", **config_args) + config = uvicorn.Config(DARK_STORAGE_APP, **config_args) if uvicorn_config is None else uvicorn_config #uvicorn.Config() resets the logging config (overriding additional handlers added to loggers like e.g. the ert_azurelogger handler added through the pluggin system server = Server(config, json.dumps(connection_info)) logger = logging.getLogger("ert.shared.storage.info") @@ -143,21 +149,44 @@ def check_parent_alive() -> bool: os.kill(os.getpid(), signal.SIGTERM) -if __name__ == "__main__": +def main(): + args = parse_args() + config_args: Dict[str, Any] = {} with open(STORAGE_LOG_CONFIG, encoding="utf-8") as conf_file: logging_conf = yaml.safe_load(conf_file) logging.config.dictConfig(logging_conf) + config_args.update(log_config=logging_conf) warnings.filterwarnings("ignore", category=DeprecationWarning) - uvicorn.config.LOGGING_CONFIG.clear() - uvicorn.config.LOGGING_CONFIG.update(logging_conf) + + if args.debug: + config_args.update(reload=True, reload_dirs=[os.path.dirname(ert_shared_path)]) + uvicorn_config = uvicorn.Config(DARK_STORAGE_APP, **config_args) # Need to run uvicorn.Config before entering the ErtPluginContext because uvicorn.Config overrides the configuration of existing loggers, thus removing log handlers added by ErtPluginContext + _stopped = threading.Event() terminate_on_parent_death_thread = threading.Thread( target=terminate_on_parent_death, args=[_stopped, 1.0] ) - with ErtPluginContext(logger=logging.getLogger()) as context: + with ErtPluginContext(logger=logging.getLogger(), trace_provider=tracer_provider) as context: terminate_on_parent_death_thread.start() - try: - run_server(debug=False) - finally: - _stopped.set() - terminate_on_parent_death_thread.join() + with tracer.start_as_current_span(f"run_storage_server") as currentSpan: + try: + print(f"Opertation ID: {get_trace_id()}") + run_server(args, debug=False, uvicorn_config = uvicorn_config) + except BaseException as err: + print(f"Stopped with exception {err}") + finally: + _stopped.set() + terminate_on_parent_death_thread.join() + print("Closing2") + + + +def sigterm_handler(_signo, _stack_frame): + print("handle sigterm") + sys.exit(0) + +signal.signal(signal.SIGTERM, sigterm_handler) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/ert/services/storage_service.py b/src/ert/services/storage_service.py index 734ecce5c91..38914415fc8 100644 --- a/src/ert/services/storage_service.py +++ b/src/ert/services/storage_service.py @@ -10,6 +10,8 @@ from ert.dark_storage.client import Client, ConnInfo from ert.services._base_service import BaseService, _Context, local_exec_args +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +HTTPXClientInstrumentor().instrument() class StorageService(BaseService): service_name = "storage" From f101af6fb033cc8b63cd67b01003bcb6e9e6a188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 18 Dec 2024 15:08:57 +0100 Subject: [PATCH 2/5] Pass ert trace context to dark storage --- src/ert/services/_storage_main.py | 21 +++++++++++---------- src/ert/services/storage_service.py | 5 +++++ src/ert/shared/storage/command.py | 6 ++++++ src/ert/trace.py | 7 +++++++ 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/ert/services/_storage_main.py b/src/ert/services/_storage_main.py index b21d56022fc..21b03f37955 100644 --- a/src/ert/services/_storage_main.py +++ b/src/ert/services/_storage_main.py @@ -22,8 +22,9 @@ from ert.shared import __file__ as ert_shared_path from ert.shared import find_available_socket from ert.shared.storage.command import add_parser_options -from ert.trace import get_trace_id, trace, tracer, tracer_provider +from ert.trace import tracer, tracer_provider +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from opentelemetry.trace.span import Span DARK_STORAGE_APP = "ert.dark_storage.app:app" @@ -87,7 +88,6 @@ def _create_connection_info(sock: socket.socket, authtoken: str) -> dict[str, An return connection_info def run_server(args: argparse.Namespace | None = None, debug: bool = False, uvicorn_config = None) -> None: - trace_id = get_trace_id() if args is None: args = parse_args() @@ -112,8 +112,8 @@ def run_server(args: argparse.Namespace | None = None, debug: bool = False, uvic server = Server(config, json.dumps(connection_info)) logger = logging.getLogger("ert.shared.storage.info") - log_level = logging.INFO if args.verbose else logging.WARNING - logger.setLevel(log_level) + if args.verbose and logger.level > logging.INFO: + logger.setLevel(logging.INFO) logger.info("Storage server is ready to accept requests. Listening on:") for url in connection_info["urls"]: logger.info(f" {url}") @@ -162,27 +162,28 @@ def main(): config_args.update(reload=True, reload_dirs=[os.path.dirname(ert_shared_path)]) uvicorn_config = uvicorn.Config(DARK_STORAGE_APP, **config_args) # Need to run uvicorn.Config before entering the ErtPluginContext because uvicorn.Config overrides the configuration of existing loggers, thus removing log handlers added by ErtPluginContext + ctx = TraceContextTextMapPropagator().extract(carrier={'traceparent': args.traceparent}) if args.traceparent else None + _stopped = threading.Event() terminate_on_parent_death_thread = threading.Thread( target=terminate_on_parent_death, args=[_stopped, 1.0] ) with ErtPluginContext(logger=logging.getLogger(), trace_provider=tracer_provider) as context: terminate_on_parent_death_thread.start() - with tracer.start_as_current_span(f"run_storage_server") as currentSpan: + with tracer.start_as_current_span(f"run_storage_server", ctx) as currentSpan: + logger = logging.getLogger("ert.shared.storage.info") try: - print(f"Opertation ID: {get_trace_id()}") + logger.info("Starting dark storage") run_server(args, debug=False, uvicorn_config = uvicorn_config) - except BaseException as err: - print(f"Stopped with exception {err}") + except SystemExit: + logger.info("Stopping dark storage") finally: _stopped.set() terminate_on_parent_death_thread.join() - print("Closing2") def sigterm_handler(_signo, _stack_frame): - print("handle sigterm") sys.exit(0) signal.signal(signal.SIGTERM, sigterm_handler) diff --git a/src/ert/services/storage_service.py b/src/ert/services/storage_service.py index 38914415fc8..694fae11d4d 100644 --- a/src/ert/services/storage_service.py +++ b/src/ert/services/storage_service.py @@ -9,6 +9,7 @@ from ert.dark_storage.client import Client, ConnInfo from ert.services._base_service import BaseService, _Context, local_exec_args +from ert.trace import get_traceparent from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor HTTPXClientInstrumentor().instrument() @@ -23,6 +24,7 @@ def __init__( conn_info: Mapping[str, Any] | Exception | None = None, project: str | None = None, verbose: bool = False, + traceparent: str | None = "inherit_parent" ): self._url: str | None = None @@ -31,6 +33,9 @@ def __init__( exec_args.extend(["--project", str(project)]) if verbose: exec_args.append("--verbose") + if traceparent: + traceparent = get_traceparent() if traceparent == "inherit_parent" else traceparent + exec_args.extend(["--traceparent", str(traceparent)]) super().__init__(exec_args, timeout, conn_info, project) diff --git a/src/ert/shared/storage/command.py b/src/ert/shared/storage/command.py index 05bb63e9fbb..73be6dcd75b 100644 --- a/src/ert/shared/storage/command.py +++ b/src/ert/shared/storage/command.py @@ -16,6 +16,12 @@ def add_parser_options(ap: ArgumentParser) -> None: help="Path to directory in which to create storage_server.json", default=os.getcwd(), ) + ap.add_argument( + "--traceparent", + type=str, + help="Trace parent id to be used by the storage root span", + default=None, + ) ap.add_argument( "--host", type=str, default=os.environ.get("ERT_STORAGE_HOST", "127.0.0.1") ) diff --git a/src/ert/trace.py b/src/ert/trace.py index c7a24b48b1c..2d98eba4483 100644 --- a/src/ert/trace.py +++ b/src/ert/trace.py @@ -1,6 +1,7 @@ from opentelemetry import trace from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import SpanLimits, TracerProvider +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator resource = Resource(attributes={SERVICE_NAME: "ert"}) tracer_provider = TracerProvider( @@ -13,3 +14,9 @@ def get_trace_id() -> str: return trace.format_trace_id(trace.get_current_span().get_span_context().trace_id) + +def get_traceparent() -> str | None: + carrier = {} + # Write the current context into the carrier. + TraceContextTextMapPropagator().inject(carrier) + return carrier.get('traceparent') \ No newline at end of file From e8d20e592cebb6707a694f897325113bb4d8ab43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 18 Dec 2024 15:08:57 +0100 Subject: [PATCH 3/5] Propagate ert.shared.storage.info logs This will include these log messages in azure --- src/ert/logging/storage_log.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ert/logging/storage_log.conf b/src/ert/logging/storage_log.conf index d850e528a49..68affbf98d3 100644 --- a/src/ert/logging/storage_log.conf +++ b/src/ert/logging/storage_log.conf @@ -32,7 +32,7 @@ loggers: ert.shared.storage.info: level: INFO handlers: [infohandler, file] - propagate: False + propagate: True ert.shared.status: level: INFO res: From 080b057e61827d8e9233f2a7bf794a9cf9f86999 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 18 Dec 2024 15:29:01 +0100 Subject: [PATCH 4/5] Fix style with pre-commit --- src/ert/dark_storage/app.py | 5 ++- src/ert/services/_storage_main.py | 47 ++++++++++++++++++----------- src/ert/services/storage_service.py | 9 ++++-- src/ert/trace.py | 3 +- 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/src/ert/dark_storage/app.py b/src/ert/dark_storage/app.py index 760564620e3..95b2e402a57 100644 --- a/src/ert/dark_storage/app.py +++ b/src/ert/dark_storage/app.py @@ -6,12 +6,11 @@ from fastapi import FastAPI, Request, status from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html from fastapi.responses import HTMLResponse, RedirectResponse, Response +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from ert.dark_storage.endpoints import router as endpoints_router from ert.dark_storage.exceptions import ErtStorageError -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor - class JSONEncoder(json.JSONEncoder): """ @@ -110,4 +109,4 @@ async def healthcheck() -> str: app.include_router(endpoints_router) -FastAPIInstrumentor.instrument_app(app) \ No newline at end of file +FastAPIInstrumentor.instrument_app(app) diff --git a/src/ert/services/_storage_main.py b/src/ert/services/_storage_main.py index 21b03f37955..ea08be5c344 100644 --- a/src/ert/services/_storage_main.py +++ b/src/ert/services/_storage_main.py @@ -15,6 +15,7 @@ import uvicorn import yaml +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from uvicorn.supervisors import ChangeReload from ert.logging import STORAGE_LOG_CONFIG @@ -24,9 +25,6 @@ from ert.shared.storage.command import add_parser_options from ert.trace import tracer, tracer_provider -from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from opentelemetry.trace.span import Span - DARK_STORAGE_APP = "ert.dark_storage.app:app" @@ -87,7 +85,10 @@ def _create_connection_info(sock: socket.socket, authtoken: str) -> dict[str, An return connection_info -def run_server(args: argparse.Namespace | None = None, debug: bool = False, uvicorn_config = None) -> None: + +def run_server( + args: argparse.Namespace | None = None, debug: bool = False, uvicorn_config=None +) -> None: if args is None: args = parse_args() @@ -108,7 +109,11 @@ def run_server(args: argparse.Namespace | None = None, debug: bool = False, uvic # Appropriated from uvicorn.main:run os.environ["ERT_STORAGE_NO_TOKEN"] = "1" os.environ["ERT_STORAGE_ENS_PATH"] = os.path.abspath(args.project) - config = uvicorn.Config(DARK_STORAGE_APP, **config_args) if uvicorn_config is None else uvicorn_config #uvicorn.Config() resets the logging config (overriding additional handlers added to loggers like e.g. the ert_azurelogger handler added through the pluggin system + config = ( + uvicorn.Config(DARK_STORAGE_APP, **config_args) + if uvicorn_config is None + else uvicorn_config + ) # uvicorn.Config() resets the logging config (overriding additional handlers added to loggers like e.g. the ert_azurelogger handler added through the pluggin system server = Server(config, json.dumps(connection_info)) logger = logging.getLogger("ert.shared.storage.info") @@ -151,7 +156,7 @@ def check_parent_alive() -> bool: def main(): args = parse_args() - config_args: Dict[str, Any] = {} + config_args: dict[str, Any] = {} with open(STORAGE_LOG_CONFIG, encoding="utf-8") as conf_file: logging_conf = yaml.safe_load(conf_file) logging.config.dictConfig(logging_conf) @@ -160,34 +165,42 @@ def main(): if args.debug: config_args.update(reload=True, reload_dirs=[os.path.dirname(ert_shared_path)]) - uvicorn_config = uvicorn.Config(DARK_STORAGE_APP, **config_args) # Need to run uvicorn.Config before entering the ErtPluginContext because uvicorn.Config overrides the configuration of existing loggers, thus removing log handlers added by ErtPluginContext - - ctx = TraceContextTextMapPropagator().extract(carrier={'traceparent': args.traceparent}) if args.traceparent else None + uvicorn_config = uvicorn.Config( + DARK_STORAGE_APP, **config_args + ) # Need to run uvicorn.Config before entering the ErtPluginContext because uvicorn.Config overrides the configuration of existing loggers, thus removing log handlers added by ErtPluginContext + + ctx = ( + TraceContextTextMapPropagator().extract( + carrier={"traceparent": args.traceparent} + ) + if args.traceparent + else None + ) - _stopped = threading.Event() + stopped = threading.Event() terminate_on_parent_death_thread = threading.Thread( - target=terminate_on_parent_death, args=[_stopped, 1.0] + target=terminate_on_parent_death, args=[stopped, 1.0] ) - with ErtPluginContext(logger=logging.getLogger(), trace_provider=tracer_provider) as context: + with ErtPluginContext(logger=logging.getLogger(), trace_provider=tracer_provider): terminate_on_parent_death_thread.start() - with tracer.start_as_current_span(f"run_storage_server", ctx) as currentSpan: + with tracer.start_as_current_span("run_storage_server", ctx): logger = logging.getLogger("ert.shared.storage.info") try: logger.info("Starting dark storage") - run_server(args, debug=False, uvicorn_config = uvicorn_config) + run_server(args, debug=False, uvicorn_config=uvicorn_config) except SystemExit: logger.info("Stopping dark storage") finally: - _stopped.set() + stopped.set() terminate_on_parent_death_thread.join() - def sigterm_handler(_signo, _stack_frame): sys.exit(0) + signal.signal(signal.SIGTERM, sigterm_handler) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/ert/services/storage_service.py b/src/ert/services/storage_service.py index 694fae11d4d..10a6b4c7018 100644 --- a/src/ert/services/storage_service.py +++ b/src/ert/services/storage_service.py @@ -6,14 +6,15 @@ import httpx import requests +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from ert.dark_storage.client import Client, ConnInfo from ert.services._base_service import BaseService, _Context, local_exec_args from ert.trace import get_traceparent -from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor HTTPXClientInstrumentor().instrument() + class StorageService(BaseService): service_name = "storage" @@ -24,7 +25,7 @@ def __init__( conn_info: Mapping[str, Any] | Exception | None = None, project: str | None = None, verbose: bool = False, - traceparent: str | None = "inherit_parent" + traceparent: str | None = "inherit_parent", ): self._url: str | None = None @@ -34,7 +35,9 @@ def __init__( if verbose: exec_args.append("--verbose") if traceparent: - traceparent = get_traceparent() if traceparent == "inherit_parent" else traceparent + traceparent = ( + get_traceparent() if traceparent == "inherit_parent" else traceparent + ) exec_args.extend(["--traceparent", str(traceparent)]) super().__init__(exec_args, timeout, conn_info, project) diff --git a/src/ert/trace.py b/src/ert/trace.py index 2d98eba4483..06d0dc848bd 100644 --- a/src/ert/trace.py +++ b/src/ert/trace.py @@ -15,8 +15,9 @@ def get_trace_id() -> str: return trace.format_trace_id(trace.get_current_span().get_span_context().trace_id) + def get_traceparent() -> str | None: carrier = {} # Write the current context into the carrier. TraceContextTextMapPropagator().inject(carrier) - return carrier.get('traceparent') \ No newline at end of file + return carrier.get("traceparent") From dd4060219fba60c3a31113da39774efb9ff4b9ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20Steinkopf=20S=C3=B8hoel?= Date: Wed, 8 Jan 2025 13:39:04 +0100 Subject: [PATCH 5/5] Fix type hints --- src/ert/services/_storage_main.py | 9 ++++++--- src/ert/trace.py | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/ert/services/_storage_main.py b/src/ert/services/_storage_main.py index ea08be5c344..14b1626b560 100644 --- a/src/ert/services/_storage_main.py +++ b/src/ert/services/_storage_main.py @@ -11,6 +11,7 @@ import threading import time import warnings +from types import FrameType from typing import Any import uvicorn @@ -87,7 +88,9 @@ def _create_connection_info(sock: socket.socket, authtoken: str) -> dict[str, An def run_server( - args: argparse.Namespace | None = None, debug: bool = False, uvicorn_config=None + args: argparse.Namespace | None = None, + debug: bool = False, + uvicorn_config: uvicorn.Config | None = None, ) -> None: if args is None: args = parse_args() @@ -154,7 +157,7 @@ def check_parent_alive() -> bool: os.kill(os.getpid(), signal.SIGTERM) -def main(): +def main() -> None: args = parse_args() config_args: dict[str, Any] = {} with open(STORAGE_LOG_CONFIG, encoding="utf-8") as conf_file: @@ -195,7 +198,7 @@ def main(): terminate_on_parent_death_thread.join() -def sigterm_handler(_signo, _stack_frame): +def sigterm_handler(_signo: int, _stack_frame: FrameType | None) -> None: sys.exit(0) diff --git a/src/ert/trace.py b/src/ert/trace.py index 06d0dc848bd..8e7b6fcc70a 100644 --- a/src/ert/trace.py +++ b/src/ert/trace.py @@ -17,7 +17,7 @@ def get_trace_id() -> str: def get_traceparent() -> str | None: - carrier = {} + carrier: dict[str, str] = {} # Write the current context into the carrier. TraceContextTextMapPropagator().inject(carrier) return carrier.get("traceparent")