diff --git a/.env-devel b/.env-devel index 17f909698c2..b13d55b97a9 100644 --- a/.env-devel +++ b/.env-devel @@ -126,6 +126,7 @@ DIRECTOR_V2_TRACING={} # DYNAMIC_SCHEDULER ---- DYNAMIC_SCHEDULER_LOGLEVEL=DEBUG DYNAMIC_SCHEDULER_PROFILING=1 +DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER=0 DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT=01:00:00 DYNAMIC_SCHEDULER_TRACING={} DYNAMIC_SCHEDULER_UI_STORAGE_SECRET=adminadmin diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 5da1a28ba0d..265d29e56ed 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -562,6 +562,7 @@ services: REDIS_PASSWORD: ${REDIS_PASSWORD} DIRECTOR_V2_HOST: ${DIRECTOR_V2_HOST} DIRECTOR_V2_PORT: ${DIRECTOR_V2_PORT} + DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: ${DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER} DYNAMIC_SCHEDULER_LOGLEVEL: ${DYNAMIC_SCHEDULER_LOGLEVEL} DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: ${DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT} DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING} diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py index 65fc96dd660..76a8e30ceec 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py @@ -12,9 +12,7 @@ ServiceWasNotFoundError, ) -from ...core.settings import ApplicationSettings -from ...services.director_v2 import DirectorV2Client -from ...services.service_tracker import set_request_as_running, set_request_as_stopped +from ...services import scheduler_interface router = RPCRouter() @@ -23,23 +21,16 @@ async def get_service_status( app: FastAPI, *, node_id: NodeID ) -> NodeGet | DynamicServiceGet | NodeGetIdle: - director_v2_client = DirectorV2Client.get_from_app_state(app) - response: NodeGet | DynamicServiceGet | NodeGetIdle = ( - await director_v2_client.get_status(node_id) - ) - return response + return await scheduler_interface.get_service_status(app, node_id=node_id) @router.expose() async def run_dynamic_service( app: FastAPI, *, dynamic_service_start: DynamicServiceStart ) -> NodeGet | DynamicServiceGet: - director_v2_client = DirectorV2Client.get_from_app_state(app) - response: NodeGet | DynamicServiceGet = ( - await director_v2_client.run_dynamic_service(dynamic_service_start) + return await scheduler_interface.run_dynamic_service( + app, dynamic_service_start=dynamic_service_start ) - await set_request_as_running(app, dynamic_service_start) - return response @router.expose( @@ -51,12 +42,6 @@ async def run_dynamic_service( async def stop_dynamic_service( app: FastAPI, *, dynamic_service_stop: DynamicServiceStop ) -> None: - director_v2_client = DirectorV2Client.get_from_app_state(app) - settings: ApplicationSettings = app.state.settings - await director_v2_client.stop_dynamic_service( - node_id=dynamic_service_stop.node_id, - simcore_user_agent=dynamic_service_stop.simcore_user_agent, - save_state=dynamic_service_stop.save_state, - timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT, + return await scheduler_interface.stop_dynamic_service( + app, dynamic_service_stop=dynamic_service_stop ) - await set_request_as_stopped(app, dynamic_service_stop) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py index 94acb6eaac4..9f046943344 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py @@ -68,6 +68,14 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): ), ) + DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field( + default=False, + description=( + "this is a way to switch between different dynamic schedulers for the new style services" + # NOTE: this option should be removed when the scheduling will be done via this service + ), + ) + @field_validator("DYNAMIC_SCHEDULER_LOGLEVEL", mode="before") @classmethod def _validate_log_level(cls, value: str) -> str: diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py new file mode 100644 index 00000000000..1d4bcdd112b --- /dev/null +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py @@ -0,0 +1,60 @@ +from fastapi import FastAPI +from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, + DynamicServiceStop, +) +from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle +from models_library.projects_nodes_io import NodeID + +from ..core.settings import ApplicationSettings +from .director_v2 import DirectorV2Client +from .service_tracker import set_request_as_running, set_request_as_stopped + + +async def get_service_status( + app: FastAPI, *, node_id: NodeID +) -> NodeGet | DynamicServiceGet | NodeGetIdle: + settings: ApplicationSettings = app.state.settings + if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: + raise NotImplementedError + + director_v2_client = DirectorV2Client.get_from_app_state(app) + response: NodeGet | DynamicServiceGet | NodeGetIdle = ( + await director_v2_client.get_status(node_id) + ) + return response + + +async def run_dynamic_service( + app: FastAPI, *, dynamic_service_start: DynamicServiceStart +) -> NodeGet | DynamicServiceGet: + settings: ApplicationSettings = app.state.settings + if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: + raise NotImplementedError + + director_v2_client = DirectorV2Client.get_from_app_state(app) + response: NodeGet | DynamicServiceGet = ( + await director_v2_client.run_dynamic_service(dynamic_service_start) + ) + + await set_request_as_running(app, dynamic_service_start) + return response + + +async def stop_dynamic_service( + app: FastAPI, *, dynamic_service_stop: DynamicServiceStop +) -> None: + settings: ApplicationSettings = app.state.settings + if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: + raise NotImplementedError + + director_v2_client = DirectorV2Client.get_from_app_state(app) + await director_v2_client.stop_dynamic_service( + node_id=dynamic_service_stop.node_id, + simcore_user_agent=dynamic_service_stop.simcore_user_agent, + save_state=dynamic_service_stop.save_state, + timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT, + ) + + await set_request_as_stopped(app, dynamic_service_stop) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_deferred_get_status.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_deferred_get_status.py index 4cd8209d1ae..3f6efbfaecb 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_deferred_get_status.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_deferred_get_status.py @@ -12,8 +12,7 @@ from servicelib.deferred_tasks import BaseDeferredHandler, TaskUID from servicelib.deferred_tasks._base_deferred_handler import DeferredContext -from .. import service_tracker -from ..director_v2 import DirectorV2Client +from .. import scheduler_interface, service_tracker from ..notifier import notify_service_status_change _logger = logging.getLogger(__name__) @@ -47,9 +46,8 @@ async def run( app: FastAPI = context["app"] node_id: NodeID = context["node_id"] - director_v2_client: DirectorV2Client = DirectorV2Client.get_from_app_state(app) service_status: NodeGet | RunningDynamicServiceDetails | NodeGetIdle = ( - await director_v2_client.get_status(node_id) + await scheduler_interface.get_service_status(app, node_id=node_id) ) _logger.debug( "Service status type=%s, %s", type(service_status), service_status diff --git a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py index 7ee876e9e4b..ddd8bfd6e2d 100644 --- a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py +++ b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py @@ -20,6 +20,7 @@ from models_library.users import UserID from pydantic import TypeAdapter from pytest_mock import MockerFixture +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.rabbitmq import RabbitMQRPCClient, RPCServerError from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler import services @@ -133,12 +134,35 @@ def mock_director_v2_service_state( yield None +@pytest.fixture( + params=[ + False, + pytest.param( + True, + marks=pytest.mark.xfail( + reason="INTERNAL scheduler implementation is missing" + ), + ), + ] +) +def use_internal_scheduler(request: pytest.FixtureRequest) -> bool: + return request.param + + @pytest.fixture def app_environment( app_environment: EnvVarsDict, rabbit_service: RabbitSettings, redis_service: RedisSettings, + use_internal_scheduler: bool, + monkeypatch: pytest.MonkeyPatch, ) -> EnvVarsDict: + setenvs_from_dict( + monkeypatch, + { + "DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER": f"{use_internal_scheduler}", + }, + ) return app_environment