Skip to content

Commit

Permalink
♻️ redirect list_dynamic_services via dynamic-scheduler (#6893)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored Dec 9, 2024
1 parent 1ab9b99 commit 0651c06
Show file tree
Hide file tree
Showing 33 changed files with 389 additions and 280 deletions.
12 changes: 12 additions & 0 deletions packages/common-library/src/common_library/unset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Any


class UnSet:
VALUE: "UnSet"


UnSet.VALUE = UnSet()


def as_dict_exclude_unset(**params) -> dict[str, Any]:
return {k: v for k, v in params.items() if not isinstance(v, UnSet)}
15 changes: 15 additions & 0 deletions packages/common-library/tests/test_unset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Any

from common_library.unset import UnSet, as_dict_exclude_unset


def test_as_dict_exclude_unset():
def f(
par1: str | UnSet = UnSet.VALUE, par2: int | UnSet = UnSet.VALUE
) -> dict[str, Any]:
return as_dict_exclude_unset(par1=par1, par2=par2)

assert f() == {}
assert f(par1="hi") == {"par1": "hi"}
assert f(par2=4) == {"par2": 4}
assert f(par1="hi", par2=4) == {"par1": "hi", "par2": 4}
Original file line number Diff line number Diff line change
Expand Up @@ -92,32 +92,33 @@ class RunningDynamicServiceDetails(ServiceDetails):
ignored_types=(cached_property,),
json_schema_extra={
"examples": [
# legacy
{
"boot_type": "V0",
"key": "simcore/services/dynamic/3dviewer",
"version": "2.4.5",
"user_id": 234,
"project_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe",
"uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"basepath": "/x/75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"host": "3dviewer_75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"internal_port": 8888,
"state": "running",
"message": "",
"node_uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"service_key": "simcore/services/dynamic/raw-graphs",
"service_version": "2.10.6",
"user_id": 1,
"project_id": "32fb4eb6-ab30-11ef-9ee4-0242ac140008",
"service_uuid": "0cd049ba-cd6b-4a12-b416-a50c9bc8e7bb",
"service_basepath": "/x/0cd049ba-cd6b-4a12-b416-a50c9bc8e7bb",
"service_host": "raw-graphs_0cd049ba-cd6b-4a12-b416-a50c9bc8e7bb",
"service_port": 4000,
"published_port": None,
"entry_point": "",
"service_state": "running",
"service_message": "",
},
# new style
{
"service_key": "simcore/services/dynamic/jupyter-math",
"service_version": "3.0.3",
"user_id": 1,
"project_id": "32fb4eb6-ab30-11ef-9ee4-0242ac140008",
"service_uuid": "6e3cad3a-eb64-43de-b476-9ac3c413fd9c",
"boot_type": "V2",
"key": "simcore/services/dynamic/dy-static-file-viewer-dynamic-sidecar",
"version": "1.0.0",
"user_id": 234,
"project_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe",
"uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"host": "dy-sidecar_75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"internal_port": 80,
"state": "running",
"message": "",
"node_uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"service_host": "dy-sidecar_6e3cad3a-eb64-43de-b476-9ac3c413fd9c",
"service_port": 8888,
"service_state": "running",
"service_message": "",
},
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.users import UserID
from pydantic import NonNegativeInt, TypeAdapter
from servicelib.logging_utils import log_decorator
from servicelib.rabbitmq import RabbitMQRPCClient
Expand All @@ -29,6 +31,24 @@
_RPC_METHOD_NAME_ADAPTER: TypeAdapter[RPCMethodName] = TypeAdapter(RPCMethodName)


@log_decorator(_logger, level=logging.DEBUG)
async def list_tracked_dynamic_services(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
user_id: UserID | None = None,
project_id: ProjectID | None = None,
) -> list[DynamicServiceGet]:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("list_tracked_dynamic_services"),
user_id=user_id,
project_id=project_id,
timeout_s=_RPC_DEFAULT_TIMEOUT_S,
)
assert isinstance(result, list) # nosec
return result


@log_decorator(_logger, level=logging.DEBUG)
async def get_service_status(
rabbitmq_rpc_client: RabbitMQRPCClient, *, node_id: NodeID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID
from servicelib.rabbitmq import RPCRouter
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import (
ServiceWaitingForManualInterventionError,
Expand All @@ -17,6 +19,15 @@
router = RPCRouter()


@router.expose()
async def list_tracked_dynamic_services(
app: FastAPI, *, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[DynamicServiceGet]:
return await scheduler_interface.list_tracked_dynamic_services(
app, user_id=user_id, project_id=project_id
)


@router.expose()
async def get_service_status(
app: FastAPI, *, node_id: NodeID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
DynamicServiceStart,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID
from pydantic import TypeAdapter
from servicelib.fastapi.app_state import SingletonInAppStateMixin
from servicelib.fastapi.http_client import AttachLifespanMixin, HasClientSetupInterface
Expand Down Expand Up @@ -73,7 +75,7 @@ async def stop_dynamic_service(
node_id: NodeID,
simcore_user_agent: str,
save_state: bool,
timeout: datetime.timedelta
timeout: datetime.timedelta, # noqa: ASYNC109
) -> None:
try:
await self.thin_client.delete_dynamic_service(
Expand All @@ -98,6 +100,14 @@ async def stop_dynamic_service(

raise

async def list_tracked_dynamic_services(
self, *, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[DynamicServiceGet]:
response = await self.thin_client.get_dynamic_services(
user_id=user_id, project_id=project_id
)
return TypeAdapter(list[DynamicServiceGet]).validate_python(response.json())


def setup_director_v2(app: FastAPI) -> None:
public_client = DirectorV2Client(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
from typing import cast

from common_library.json_serialization import json_dumps
from common_library.unset import UnSet, as_dict_exclude_unset
from fastapi import FastAPI, status
from httpx import Response, Timeout
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
)
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_resources import ServiceResourcesDictHelpers
from models_library.users import UserID
from servicelib.common_headers import (
X_DYNAMIC_SIDECAR_REQUEST_DNS,
X_DYNAMIC_SIDECAR_REQUEST_SCHEME,
Expand Down Expand Up @@ -108,3 +111,16 @@ async def _(
)

return await _(self)

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def get_dynamic_services(
self,
*,
user_id: UserID | None | UnSet = UnSet.VALUE,
project_id: ProjectID | None | UnSet = UnSet.VALUE,
) -> Response:
return await self.client.get(
"/dynamic_services",
params=as_dict_exclude_unset(user_id=user_id, project_id=project_id),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,28 @@
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID

from ..core.settings import ApplicationSettings
from .director_v2 import DirectorV2Client
from .service_tracker import set_request_as_running, set_request_as_stopped


async def list_tracked_dynamic_services(
app: FastAPI, *, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[DynamicServiceGet]:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
return await director_v2_client.list_tracked_dynamic_services(
user_id=user_id, project_id=project_id
)


async def get_service_status(
app: FastAPI, *, node_id: NodeID
) -> NodeGet | DynamicServiceGet | NodeGetIdle:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ def mock_director_v2_service_state(
assert_all_called=False,
assert_all_mocked=True, # IMPORTANT: KEEP always True!
) as mock:
mock.get("/dynamic_services").respond(
status.HTTP_200_OK,
text=json.dumps(
jsonable_encoder(
DynamicServiceGet.model_config["json_schema_extra"]["examples"]
)
),
)

mock.get(f"/dynamic_services/{node_id_new_style}").respond(
status.HTTP_200_OK, text=service_status_new_style.model_dump_json()
)
Expand Down Expand Up @@ -177,6 +186,17 @@ async def rpc_client(
return await rabbitmq_rpc_client("client")


async def test_list_tracked_dynamic_services(rpc_client: RabbitMQRPCClient):
results = await services.list_tracked_dynamic_services(
rpc_client, user_id=None, project_id=None
)
assert len(results) == 2
assert results == [
TypeAdapter(DynamicServiceGet).validate_python(x)
for x in DynamicServiceGet.model_config["json_schema_extra"]["examples"]
]


async def test_get_state(
rpc_client: RabbitMQRPCClient,
node_id_new_style: NodeID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ def _get_dynamic_service_get_from(
service_state: ServiceState,
) -> DynamicServiceGet:
dict_data = DynamicServiceGet.model_config["json_schema_extra"]["examples"][1]
assert "state" in dict_data
dict_data["state"] = service_state
assert "service_state" in dict_data
dict_data["service_state"] = service_state
return TypeAdapter(DynamicServiceGet).validate_python(dict_data)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ def _get_dynamic_service_get_legacy_with(
_add_to_dict(
dict_data,
[
("state", state),
("uuid", f"{node_id}"),
("node_uuid", f"{node_id}"),
("service_state", state),
("service_uuid", f"{node_id}"),
],
)
return TypeAdapter(DynamicServiceGet).validate_python(dict_data)
Expand All @@ -116,9 +115,8 @@ def _get_dynamic_service_get_new_style_with(
_add_to_dict(
dict_data,
[
("state", state),
("uuid", f"{node_id}"),
("node_uuid", f"{node_id}"),
("service_state", state),
("service_uuid", f"{node_id}"),
],
)
return TypeAdapter(DynamicServiceGet).validate_python(dict_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
import logging

from aiohttp import web
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.projects import ProjectID
from models_library.services import ServicePortKey
from pydantic import BaseModel, NonNegativeInt, TypeAdapter
from pydantic.types import PositiveInt
from pydantic import NonNegativeInt
from servicelib.logging_utils import log_decorator
from yarl import URL

Expand All @@ -22,36 +20,6 @@
_log = logging.getLogger(__name__)


class _Params(BaseModel):
user_id: PositiveInt | None = None
project_id: str | None = None


async def list_dynamic_services(
app: web.Application,
user_id: PositiveInt | None = None,
project_id: str | None = None,
) -> list[DynamicServiceGet]:
params = _Params(user_id=user_id, project_id=project_id)
params_dict = params.model_dump(exclude_none=True)
settings: DirectorV2Settings = get_plugin_settings(app)
if params_dict: # Update query doesnt work with no params to unwrap
backend_url = (settings.base_url / "dynamic_services").update_query(
**params_dict
)
else:
backend_url = settings.base_url / "dynamic_services"

services = await request_director_v2(
app, "GET", backend_url, expected_status=web.HTTPOk
)

if services is None:
services = []
assert isinstance(services, list) # nosec
return TypeAdapter(list[DynamicServiceGet]).validate_python(services)


# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191
@log_decorator(logger=_log)
async def retrieve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
)
from ._core_dynamic_services import (
get_project_inactivity,
list_dynamic_services,
request_retrieve_dyn_service,
restart_dynamic_service,
retrieve,
Expand All @@ -39,7 +38,6 @@
"get_project_run_policy",
"is_healthy",
"is_pipeline_running",
"list_dynamic_services",
"request_retrieve_dyn_service",
"restart_dynamic_service",
"retrieve",
Expand Down
Loading

0 comments on commit 0651c06

Please sign in to comment.