Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include spans for dark storage #9535

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ dependencies = [
"openpyxl", # extra dependency for pandas (excel)
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry.instrumentation.fastapi",
"opentelemetry-instrumentation-httpx",
"opentelemetry-instrumentation-threading",
"orjson",
"packaging",
Expand Down
3 changes: 3 additions & 0 deletions src/ert/dark_storage/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi import FastAPI, Request, status
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

from ert.dark_storage.endpoints import router as endpoints_router
from ert.dark_storage.exceptions import ErtStorageError
Expand Down Expand Up @@ -107,3 +108,5 @@ async def healthcheck() -> str:


app.include_router(endpoints_router)

FastAPIInstrumentor.instrument_app(app)
2 changes: 1 addition & 1 deletion src/ert/logging/storage_log.conf
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ loggers:
ert.shared.storage.info:
level: INFO
handlers: [infohandler, file]
propagate: False
propagate: True
ert.shared.status:
level: INFO
res:
Expand Down
76 changes: 61 additions & 15 deletions src/ert/services/_storage_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,26 @@
import signal
import socket
import string
import sys
import threading
import time
import warnings
from types import FrameType
from typing import Any

import uvicorn
import yaml
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from uvicorn.supervisors import ChangeReload

from ert.logging import STORAGE_LOG_CONFIG
from ert.plugins import ErtPluginContext
from ert.shared import __file__ as ert_shared_path
from ert.shared import find_available_socket
from ert.shared.storage.command import add_parser_options
from ert.trace import tracer, tracer_provider

DARK_STORAGE_APP = "ert.dark_storage.app:app"


class Server(uvicorn.Server):
Expand Down Expand Up @@ -81,7 +87,11 @@ def _create_connection_info(sock: socket.socket, authtoken: str) -> dict[str, An
return connection_info


def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> None:
def run_server(
args: argparse.Namespace | None = None,
debug: bool = False,
uvicorn_config: uvicorn.Config | None = None,
) -> None:
if args is None:
args = parse_args()

Expand All @@ -102,12 +112,16 @@ def run_server(args: argparse.Namespace | None = None, debug: bool = False) -> N
# Appropriated from uvicorn.main:run
os.environ["ERT_STORAGE_NO_TOKEN"] = "1"
os.environ["ERT_STORAGE_ENS_PATH"] = os.path.abspath(args.project)
config = uvicorn.Config("ert.dark_storage.app:app", **config_args)
config = (
uvicorn.Config(DARK_STORAGE_APP, **config_args)
if uvicorn_config is None
else uvicorn_config
) # uvicorn.Config() resets the logging config (overriding additional handlers added to loggers like e.g. the ert_azurelogger handler added through the pluggin system
server = Server(config, json.dumps(connection_info))

logger = logging.getLogger("ert.shared.storage.info")
log_level = logging.INFO if args.verbose else logging.WARNING
logger.setLevel(log_level)
if args.verbose and logger.level > logging.INFO:
logger.setLevel(logging.INFO)
logger.info("Storage server is ready to accept requests. Listening on:")
for url in connection_info["urls"]:
logger.info(f" {url}")
Expand Down Expand Up @@ -143,21 +157,53 @@ def check_parent_alive() -> bool:
os.kill(os.getpid(), signal.SIGTERM)


if __name__ == "__main__":
def main() -> None:
args = parse_args()
config_args: dict[str, Any] = {}
with open(STORAGE_LOG_CONFIG, encoding="utf-8") as conf_file:
logging_conf = yaml.safe_load(conf_file)
logging.config.dictConfig(logging_conf)
config_args.update(log_config=logging_conf)
warnings.filterwarnings("ignore", category=DeprecationWarning)
uvicorn.config.LOGGING_CONFIG.clear()
uvicorn.config.LOGGING_CONFIG.update(logging_conf)
_stopped = threading.Event()

if args.debug:
config_args.update(reload=True, reload_dirs=[os.path.dirname(ert_shared_path)])
uvicorn_config = uvicorn.Config(
DARK_STORAGE_APP, **config_args
) # Need to run uvicorn.Config before entering the ErtPluginContext because uvicorn.Config overrides the configuration of existing loggers, thus removing log handlers added by ErtPluginContext
Copy link
Contributor

@eivindjahren eivindjahren Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is a bit long, same with the one on line 119


ctx = (
TraceContextTextMapPropagator().extract(
carrier={"traceparent": args.traceparent}
)
if args.traceparent
else None
)

stopped = threading.Event()
terminate_on_parent_death_thread = threading.Thread(
target=terminate_on_parent_death, args=[_stopped, 1.0]
target=terminate_on_parent_death, args=[stopped, 1.0]
)
with ErtPluginContext(logger=logging.getLogger()) as context:
with ErtPluginContext(logger=logging.getLogger(), trace_provider=tracer_provider):
terminate_on_parent_death_thread.start()
try:
run_server(debug=False)
finally:
_stopped.set()
terminate_on_parent_death_thread.join()
with tracer.start_as_current_span("run_storage_server", ctx):
logger = logging.getLogger("ert.shared.storage.info")
try:
logger.info("Starting dark storage")
run_server(args, debug=False, uvicorn_config=uvicorn_config)
except SystemExit:
logger.info("Stopping dark storage")
finally:
stopped.set()
terminate_on_parent_death_thread.join()


def sigterm_handler(_signo: int, _stack_frame: FrameType | None) -> None:
sys.exit(0)


signal.signal(signal.SIGTERM, sigterm_handler)


if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions src/ert/services/storage_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

import httpx
import requests
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

from ert.dark_storage.client import Client, ConnInfo
from ert.services._base_service import BaseService, _Context, local_exec_args
from ert.trace import get_traceparent

HTTPXClientInstrumentor().instrument()


class StorageService(BaseService):
Expand All @@ -21,6 +25,7 @@ def __init__(
conn_info: Mapping[str, Any] | Exception | None = None,
project: str | None = None,
verbose: bool = False,
traceparent: str | None = "inherit_parent",
):
self._url: str | None = None

Expand All @@ -29,6 +34,11 @@ def __init__(
exec_args.extend(["--project", str(project)])
if verbose:
exec_args.append("--verbose")
if traceparent:
traceparent = (
get_traceparent() if traceparent == "inherit_parent" else traceparent
)
exec_args.extend(["--traceparent", str(traceparent)])

super().__init__(exec_args, timeout, conn_info, project)

Expand Down
6 changes: 6 additions & 0 deletions src/ert/shared/storage/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ def add_parser_options(ap: ArgumentParser) -> None:
help="Path to directory in which to create storage_server.json",
default=os.getcwd(),
)
ap.add_argument(
"--traceparent",
type=str,
help="Trace parent id to be used by the storage root span",
default=None,
)
ap.add_argument(
"--host", type=str, default=os.environ.get("ERT_STORAGE_HOST", "127.0.0.1")
)
Expand Down
8 changes: 8 additions & 0 deletions src/ert/trace.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import SpanLimits, TracerProvider
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

resource = Resource(attributes={SERVICE_NAME: "ert"})
tracer_provider = TracerProvider(
Expand All @@ -13,3 +14,10 @@

def get_trace_id() -> str:
return trace.format_trace_id(trace.get_current_span().get_span_context().trace_id)


def get_traceparent() -> str | None:
carrier: dict[str, str] = {}
# Write the current context into the carrier.
TraceContextTextMapPropagator().inject(carrier)
return carrier.get("traceparent")
Loading