Skip to content

Commit

Permalink
Include spans for dark storage (#9535)
Browse files Browse the repository at this point in the history
  • Loading branch information
HakonSohoel authored Jan 9, 2025
1 parent 71e38f2 commit 3a31ffb
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 16 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ dependencies = [
"openpyxl", # extra dependency for pandas (excel)
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry.instrumentation.fastapi",
"opentelemetry-instrumentation-httpx",
"opentelemetry-instrumentation-threading",
"orjson",
"packaging",
Expand Down
3 changes: 3 additions & 0 deletions src/ert/dark_storage/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi import FastAPI, Request, status
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

from ert.dark_storage.endpoints import router as endpoints_router
from ert.dark_storage.exceptions import ErtStorageError
Expand Down Expand Up @@ -107,3 +108,5 @@ async def healthcheck() -> str:


app.include_router(endpoints_router)

FastAPIInstrumentor.instrument_app(app)
2 changes: 1 addition & 1 deletion src/ert/logging/storage_log.conf
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ loggers:
ert.shared.storage.info:
level: INFO
handlers: [infohandler, file]
propagate: False
propagate: True
ert.shared.status:
level: INFO
res:
Expand Down
76 changes: 61 additions & 15 deletions src/ert/services/_storage_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,26 @@
import signal
import socket
import string
import sys
import threading
import time
import warnings
from types import FrameType
from typing import Any

import uvicorn
import yaml
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from uvicorn.supervisors import ChangeReload

from ert.logging import STORAGE_LOG_CONFIG
from ert.plugins import ErtPluginContext
from ert.shared import __file__ as ert_shared_path
from ert.shared import find_available_socket
from ert.shared.storage.command import add_parser_options
from ert.trace import tracer, tracer_provider

DARK_STORAGE_APP = "ert.dark_storage.app:app"


class Server(uvicorn.Server):
Expand Down Expand Up @@ -81,7 +87,11 @@ def _create_connection_info(sock: socket.socket, authtoken: str) -> dict[str, An
return connection_info


def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> None:
def run_server(
args: argparse.Namespace | None = None,
debug: bool = False,
uvicorn_config: uvicorn.Config | None = None,
) -> None:
if args is None:
args = parse_args()

Expand All @@ -102,12 +112,16 @@ def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> N
# Appropriated from uvicorn.main:run
os.environ["ERT_STORAGE_NO_TOKEN"] = "1"
os.environ["ERT_STORAGE_ENS_PATH"] = os.path.abspath(args.project)
config = uvicorn.Config("ert.dark_storage.app:app", **config_args)
config = (
uvicorn.Config(DARK_STORAGE_APP, **config_args)
if uvicorn_config is None
else uvicorn_config
) # uvicorn.Config() resets the logging config (overriding additional handlers added to loggers like e.g. the ert_azurelogger handler added through the pluggin system
server = Server(config, json.dumps(connection_info))

logger = logging.getLogger("ert.shared.storage.info")
log_level = logging.INFO if args.verbose else logging.WARNING
logger.setLevel(log_level)
if args.verbose and logger.level > logging.INFO:
logger.setLevel(logging.INFO)
logger.info("Storage server is ready to accept requests. Listening on:")
for url in connection_info["urls"]:
logger.info(f" {url}")
Expand Down Expand Up @@ -143,21 +157,53 @@ def check_parent_alive() -> bool:
os.kill(os.getpid(), signal.SIGTERM)


if __name__ == "__main__":
def main() -> None:
args = parse_args()
config_args: dict[str, Any] = {}
with open(STORAGE_LOG_CONFIG, encoding="utf-8") as conf_file:
logging_conf = yaml.safe_load(conf_file)
logging.config.dictConfig(logging_conf)
config_args.update(log_config=logging_conf)
warnings.filterwarnings("ignore", category=DeprecationWarning)
uvicorn.config.LOGGING_CONFIG.clear()
uvicorn.config.LOGGING_CONFIG.update(logging_conf)
_stopped = threading.Event()

if args.debug:
config_args.update(reload=True, reload_dirs=[os.path.dirname(ert_shared_path)])
uvicorn_config = uvicorn.Config(
DARK_STORAGE_APP, **config_args
) # Need to run uvicorn.Config before entering the ErtPluginContext because uvicorn.Config overrides the configuration of existing loggers, thus removing log handlers added by ErtPluginContext

ctx = (
TraceContextTextMapPropagator().extract(
carrier={"traceparent": args.traceparent}
)
if args.traceparent
else None
)

stopped = threading.Event()
terminate_on_parent_death_thread = threading.Thread(
target=terminate_on_parent_death, args=[_stopped, 1.0]
target=terminate_on_parent_death, args=[stopped, 1.0]
)
with ErtPluginContext(logger=logging.getLogger()) as context:
with ErtPluginContext(logger=logging.getLogger(), trace_provider=tracer_provider):
terminate_on_parent_death_thread.start()
try:
run_server(debug=False)
finally:
_stopped.set()
terminate_on_parent_death_thread.join()
with tracer.start_as_current_span("run_storage_server", ctx):
logger = logging.getLogger("ert.shared.storage.info")
try:
logger.info("Starting dark storage")
run_server(args, debug=False, uvicorn_config=uvicorn_config)
except SystemExit:
logger.info("Stopping dark storage")
finally:
stopped.set()
terminate_on_parent_death_thread.join()


def sigterm_handler(_signo: int, _stack_frame: FrameType | None) -> None:
sys.exit(0)


signal.signal(signal.SIGTERM, sigterm_handler)


if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions src/ert/services/storage_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

import httpx
import requests
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

from ert.dark_storage.client import Client, ConnInfo
from ert.services._base_service import BaseService, _Context, local_exec_args
from ert.trace import get_traceparent

HTTPXClientInstrumentor().instrument()


class StorageService(BaseService):
Expand All @@ -21,6 +25,7 @@ def __init__(
conn_info: Mapping[str, Any] | Exception | None = None,
project: str | None = None,
verbose: bool = False,
traceparent: str | None = "inherit_parent",
):
self._url: str | None = None

Expand All @@ -29,6 +34,11 @@ def __init__(
exec_args.extend(["--project", str(project)])
if verbose:
exec_args.append("--verbose")
if traceparent:
traceparent = (
get_traceparent() if traceparent == "inherit_parent" else traceparent
)
exec_args.extend(["--traceparent", str(traceparent)])

super().__init__(exec_args, timeout, conn_info, project)

Expand Down
6 changes: 6 additions & 0 deletions src/ert/shared/storage/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ def add_parser_options(ap: ArgumentParser) -> None:
help="Path to directory in which to create storage_server.json",
default=os.getcwd(),
)
ap.add_argument(
"--traceparent",
type=str,
help="Trace parent id to be used by the storage root span",
default=None,
)
ap.add_argument(
"--host", type=str, default=os.environ.get("ERT_STORAGE_HOST", "127.0.0.1")
)
Expand Down
8 changes: 8 additions & 0 deletions src/ert/trace.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import SpanLimits, TracerProvider
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

resource = Resource(attributes={SERVICE_NAME: "ert"})
tracer_provider = TracerProvider(
Expand All @@ -13,3 +14,10 @@

def get_trace_id() -> str:
return trace.format_trace_id(trace.get_current_span().get_span_context().trace_id)


def get_traceparent() -> str | None:
carrier: dict[str, str] = {}
# Write the current context into the carrier.
TraceContextTextMapPropagator().inject(carrier)
return carrier.get("traceparent")

0 comments on commit 3a31ffb

Please sign in to comment.