Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ redirect list_dynamic_services via dynamic-scheduler #6893

Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions packages/common-library/src/common_library/unset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Any


class UnSet:
INSTANCE: "UnSet"


UnSet.INSTANCE = UnSet()
GitHK marked this conversation as resolved.
Show resolved Hide resolved


def as_dict_exclude_unset(**params) -> dict[str, Any]:
return {k: v for k, v in params.items() if not isinstance(v, UnSet)}
15 changes: 15 additions & 0 deletions packages/common-library/tests/test_unset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Any

from common_library.unset import UnSet, as_dict_exclude_unset


def test_as_dict_exclude_unset():
def f(
par1: str | UnSet = UnSet.INSTANCE, par2: int | UnSet = UnSet.INSTANCE
) -> dict[str, Any]:
return as_dict_exclude_unset(par1=par1, par2=par2)

assert f() == {}
assert f(par1="hi") == {"par1": "hi"}
assert f(par2=4) == {"par2": 4}
assert f(par1="hi", par2=4) == {"par1": "hi", "par2": 4}
Original file line number Diff line number Diff line change
Expand Up @@ -92,32 +92,33 @@ class RunningDynamicServiceDetails(ServiceDetails):
ignored_types=(cached_property,),
json_schema_extra={
"examples": [
# legacy
{
"boot_type": "V0",
"key": "simcore/services/dynamic/3dviewer",
"version": "2.4.5",
"user_id": 234,
"project_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe",
"uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"basepath": "/x/75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"host": "3dviewer_75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"internal_port": 8888,
"state": "running",
"message": "",
"node_uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"service_key": "simcore/services/dynamic/raw-graphs",
"service_version": "2.10.6",
"user_id": 1,
"project_id": "32fb4eb6-ab30-11ef-9ee4-0242ac140008",
"service_uuid": "0cd049ba-cd6b-4a12-b416-a50c9bc8e7bb",
"service_basepath": "/x/0cd049ba-cd6b-4a12-b416-a50c9bc8e7bb",
"service_host": "raw-graphs_0cd049ba-cd6b-4a12-b416-a50c9bc8e7bb",
"service_port": 4000,
"published_port": None,
"entry_point": "",
"service_state": "running",
"service_message": "",
},
GitHK marked this conversation as resolved.
Show resolved Hide resolved
# new style
{
"service_key": "simcore/services/dynamic/jupyter-math",
"service_version": "3.0.3",
"user_id": 1,
"project_id": "32fb4eb6-ab30-11ef-9ee4-0242ac140008",
"service_uuid": "6e3cad3a-eb64-43de-b476-9ac3c413fd9c",
"boot_type": "V2",
"key": "simcore/services/dynamic/dy-static-file-viewer-dynamic-sidecar",
"version": "1.0.0",
"user_id": 234,
"project_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe",
"uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"host": "dy-sidecar_75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"internal_port": 80,
"state": "running",
"message": "",
"node_uuid": "75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"service_host": "dy-sidecar_6e3cad3a-eb64-43de-b476-9ac3c413fd9c",
"service_port": 8888,
"service_state": "running",
"service_message": "",
},
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.users import UserID
from pydantic import NonNegativeInt, TypeAdapter
from servicelib.logging_utils import log_decorator
from servicelib.rabbitmq import RabbitMQRPCClient
Expand All @@ -29,6 +31,24 @@
_RPC_METHOD_NAME_ADAPTER: TypeAdapter[RPCMethodName] = TypeAdapter(RPCMethodName)


@log_decorator(_logger, level=logging.DEBUG)
async def list_tracked_dynamic_services(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
user_id: UserID | None = None,
project_id: ProjectID | None = None,
) -> list[DynamicServiceGet]:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("list_tracked_dynamic_services"),
user_id=user_id,
project_id=project_id,
timeout_s=_RPC_DEFAULT_TIMEOUT_S,
)
assert isinstance(result, list) # nosec
return result


@log_decorator(_logger, level=logging.DEBUG)
async def get_service_status(
rabbitmq_rpc_client: RabbitMQRPCClient, *, node_id: NodeID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID
from servicelib.rabbitmq import RPCRouter
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import (
ServiceWaitingForManualInterventionError,
Expand All @@ -17,6 +19,15 @@
router = RPCRouter()


@router.expose()
async def list_tracked_dynamic_services(
app: FastAPI, *, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[DynamicServiceGet]:
return await scheduler_interface.list_tracked_dynamic_services(
app, user_id=user_id, project_id=project_id
)


@router.expose()
async def get_service_status(
app: FastAPI, *, node_id: NodeID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
DynamicServiceStart,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID
from pydantic import TypeAdapter
from servicelib.fastapi.app_state import SingletonInAppStateMixin
from servicelib.fastapi.http_client import AttachLifespanMixin, HasClientSetupInterface
Expand Down Expand Up @@ -73,7 +75,7 @@ async def stop_dynamic_service(
node_id: NodeID,
simcore_user_agent: str,
save_state: bool,
timeout: datetime.timedelta
timeout: datetime.timedelta, # noqa: ASYNC109
) -> None:
try:
await self.thin_client.delete_dynamic_service(
Expand All @@ -98,6 +100,14 @@ async def stop_dynamic_service(

raise

async def list_tracked_dynamic_services(
self, *, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[DynamicServiceGet]:
response = await self.thin_client.get_dynamic_services(
user_id=user_id, project_id=project_id
)
return TypeAdapter(list[DynamicServiceGet]).validate_python(response.json())


def setup_director_v2(app: FastAPI) -> None:
public_client = DirectorV2Client(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
from typing import cast

from common_library.json_serialization import json_dumps
from common_library.unset import UnSet, as_dict_exclude_unset
from fastapi import FastAPI, status
from httpx import Response, Timeout
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
)
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_resources import ServiceResourcesDictHelpers
from models_library.users import UserID
from servicelib.common_headers import (
X_DYNAMIC_SIDECAR_REQUEST_DNS,
X_DYNAMIC_SIDECAR_REQUEST_SCHEME,
Expand Down Expand Up @@ -108,3 +111,16 @@ async def _(
)

return await _(self)

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def get_dynamic_services(
self,
*,
user_id: UserID | None | UnSet = UnSet.INSTANCE,
project_id: ProjectID | None | UnSet = UnSet.INSTANCE,
) -> Response:
return await self.client.get(
"/dynamic_services",
params=as_dict_exclude_unset(user_id=user_id, project_id=project_id),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,28 @@
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID

from ..core.settings import ApplicationSettings
from .director_v2 import DirectorV2Client
from .service_tracker import set_request_as_running, set_request_as_stopped


async def list_tracked_dynamic_services(
app: FastAPI, *, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[DynamicServiceGet]:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
return await director_v2_client.list_tracked_dynamic_services(
user_id=user_id, project_id=project_id
)


async def get_service_status(
app: FastAPI, *, node_id: NodeID
) -> NodeGet | DynamicServiceGet | NodeGetIdle:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ def mock_director_v2_service_state(
assert_all_called=False,
assert_all_mocked=True, # IMPORTANT: KEEP always True!
) as mock:
mock.get("/dynamic_services").respond(
status.HTTP_200_OK,
text=json.dumps(
jsonable_encoder(
DynamicServiceGet.model_config["json_schema_extra"]["examples"]
)
),
)

mock.get(f"/dynamic_services/{node_id_new_style}").respond(
status.HTTP_200_OK, text=service_status_new_style.model_dump_json()
)
Expand Down Expand Up @@ -177,6 +186,17 @@ async def rpc_client(
return await rabbitmq_rpc_client("client")


async def test_list_tracked_dynamic_services(rpc_client: RabbitMQRPCClient):
results = await services.list_tracked_dynamic_services(
rpc_client, user_id=None, project_id=None
)
assert len(results) == 2
assert results == [
TypeAdapter(DynamicServiceGet).validate_python(x)
for x in DynamicServiceGet.model_config["json_schema_extra"]["examples"]
]


async def test_get_state(
rpc_client: RabbitMQRPCClient,
node_id_new_style: NodeID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ def _get_dynamic_service_get_from(
service_state: ServiceState,
) -> DynamicServiceGet:
dict_data = DynamicServiceGet.model_config["json_schema_extra"]["examples"][1]
assert "state" in dict_data
dict_data["state"] = service_state
assert "service_state" in dict_data
dict_data["service_state"] = service_state
return TypeAdapter(DynamicServiceGet).validate_python(dict_data)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ def _get_dynamic_service_get_legacy_with(
_add_to_dict(
dict_data,
[
("state", state),
("uuid", f"{node_id}"),
("node_uuid", f"{node_id}"),
("service_state", state),
("service_uuid", f"{node_id}"),
],
)
return TypeAdapter(DynamicServiceGet).validate_python(dict_data)
Expand All @@ -116,9 +115,8 @@ def _get_dynamic_service_get_new_style_with(
_add_to_dict(
dict_data,
[
("state", state),
("uuid", f"{node_id}"),
("node_uuid", f"{node_id}"),
("service_state", state),
("service_uuid", f"{node_id}"),
],
)
return TypeAdapter(DynamicServiceGet).validate_python(dict_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
import logging

from aiohttp import web
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.projects import ProjectID
from models_library.services import ServicePortKey
from pydantic import BaseModel, NonNegativeInt, TypeAdapter
from pydantic.types import PositiveInt
from pydantic import NonNegativeInt
from servicelib.logging_utils import log_decorator
from yarl import URL

Expand All @@ -22,36 +20,6 @@
_log = logging.getLogger(__name__)


class _Params(BaseModel):
user_id: PositiveInt | None = None
project_id: str | None = None


async def list_dynamic_services(
app: web.Application,
user_id: PositiveInt | None = None,
project_id: str | None = None,
) -> list[DynamicServiceGet]:
params = _Params(user_id=user_id, project_id=project_id)
params_dict = params.model_dump(exclude_none=True)
settings: DirectorV2Settings = get_plugin_settings(app)
if params_dict: # Update query doesnt work with no params to unwrap
backend_url = (settings.base_url / "dynamic_services").update_query(
**params_dict
)
else:
backend_url = settings.base_url / "dynamic_services"

services = await request_director_v2(
app, "GET", backend_url, expected_status=web.HTTPOk
)

if services is None:
services = []
assert isinstance(services, list) # nosec
return TypeAdapter(list[DynamicServiceGet]).validate_python(services)


# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191
@log_decorator(logger=_log)
async def retrieve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
)
from ._core_dynamic_services import (
get_project_inactivity,
list_dynamic_services,
request_retrieve_dyn_service,
restart_dynamic_service,
retrieve,
Expand All @@ -39,7 +38,6 @@
"get_project_run_policy",
"is_healthy",
"is_pipeline_running",
"list_dynamic_services",
"request_retrieve_dyn_service",
"restart_dynamic_service",
"retrieve",
Expand Down
Loading
Loading