Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 adding new scheduling mode to dynamic-scheduler ⚠️ #6889

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ DIRECTOR_V2_TRACING={}
# DYNAMIC_SCHEDULER ----
DYNAMIC_SCHEDULER_LOGLEVEL=DEBUG
DYNAMIC_SCHEDULER_PROFILING=1
DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER=0
DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT=01:00:00
DYNAMIC_SCHEDULER_TRACING={}

Expand Down
1 change: 1 addition & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ services:
REDIS_PASSWORD: ${REDIS_PASSWORD}
DIRECTOR_V2_HOST: ${DIRECTOR_V2_HOST}
DIRECTOR_V2_PORT: ${DIRECTOR_V2_PORT}
DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: ${DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER}
DYNAMIC_SCHEDULER_LOGLEVEL: ${DYNAMIC_SCHEDULER_LOGLEVEL}
DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: ${DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT}
DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
ServiceWasNotFoundError,
)

from ...core.settings import ApplicationSettings
from ...services.director_v2 import DirectorV2Client
from ...services.service_tracker import set_request_as_running, set_request_as_stopped
from ...services import scheduler_interface

router = RPCRouter()

Expand All @@ -23,23 +21,16 @@
async def get_service_status(
app: FastAPI, *, node_id: NodeID
) -> NodeGet | DynamicServiceGet | NodeGetIdle:
director_v2_client = DirectorV2Client.get_from_app_state(app)
response: NodeGet | DynamicServiceGet | NodeGetIdle = (
await director_v2_client.get_status(node_id)
)
return response
return await scheduler_interface.get_service_status(app, node_id=node_id)


@router.expose()
async def run_dynamic_service(
app: FastAPI, *, dynamic_service_start: DynamicServiceStart
) -> NodeGet | DynamicServiceGet:
director_v2_client = DirectorV2Client.get_from_app_state(app)
response: NodeGet | DynamicServiceGet = (
await director_v2_client.run_dynamic_service(dynamic_service_start)
return await scheduler_interface.run_dynamic_service(
app, dynamic_service_start=dynamic_service_start
)
await set_request_as_running(app, dynamic_service_start)
return response


@router.expose(
Expand All @@ -51,12 +42,6 @@ async def run_dynamic_service(
async def stop_dynamic_service(
app: FastAPI, *, dynamic_service_stop: DynamicServiceStop
) -> None:
director_v2_client = DirectorV2Client.get_from_app_state(app)
settings: ApplicationSettings = app.state.settings
await director_v2_client.stop_dynamic_service(
node_id=dynamic_service_stop.node_id,
simcore_user_agent=dynamic_service_stop.simcore_user_agent,
save_state=dynamic_service_stop.save_state,
timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT,
return await scheduler_interface.stop_dynamic_service(
app, dynamic_service_stop=dynamic_service_stop
)
await set_request_as_stopped(app, dynamic_service_stop)
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
),
)

DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field(
default=False,
description=(
"this is a way to switch between different dynamic schedulers for the new style services"
# NOTE: this option should be removed when the scheduling will be done via this service
),
)

@field_validator("DYNAMIC_SCHEDULER_LOGLEVEL", mode="before")
@classmethod
def _validate_log_level(cls, value: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects_nodes_io import NodeID

from ..core.settings import ApplicationSettings
from .director_v2 import DirectorV2Client
from .service_tracker import set_request_as_running, set_request_as_stopped


async def get_service_status(
app: FastAPI, *, node_id: NodeID
) -> NodeGet | DynamicServiceGet | NodeGetIdle:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
response: NodeGet | DynamicServiceGet | NodeGetIdle = (
await director_v2_client.get_status(node_id)
)
return response


async def run_dynamic_service(
app: FastAPI, *, dynamic_service_start: DynamicServiceStart
) -> NodeGet | DynamicServiceGet:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
response: NodeGet | DynamicServiceGet = (
await director_v2_client.run_dynamic_service(dynamic_service_start)
)

await set_request_as_running(app, dynamic_service_start)
return response


async def stop_dynamic_service(
app: FastAPI, *, dynamic_service_stop: DynamicServiceStop
) -> None:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
settings: ApplicationSettings = app.state.settings
await director_v2_client.stop_dynamic_service(
node_id=dynamic_service_stop.node_id,
simcore_user_agent=dynamic_service_stop.simcore_user_agent,
save_state=dynamic_service_stop.save_state,
timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT,
)

await set_request_as_stopped(app, dynamic_service_stop)
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from servicelib.deferred_tasks import BaseDeferredHandler, TaskUID
from servicelib.deferred_tasks._base_deferred_handler import DeferredContext

from .. import service_tracker
from ..director_v2 import DirectorV2Client
from .. import scheduler_interface, service_tracker
from ..notifier import notify_service_status_change

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -47,9 +46,8 @@ async def run(
app: FastAPI = context["app"]
node_id: NodeID = context["node_id"]

director_v2_client: DirectorV2Client = DirectorV2Client.get_from_app_state(app)
service_status: NodeGet | RunningDynamicServiceDetails | NodeGetIdle = (
await director_v2_client.get_status(node_id)
await scheduler_interface.get_service_status(app, node_id=node_id)
)
_logger.debug(
"Service status type=%s, %s", type(service_status), service_status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from models_library.users import UserID
from pydantic import TypeAdapter
from pytest_mock import MockerFixture
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
from servicelib.rabbitmq import RabbitMQRPCClient, RPCServerError
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler import services
Expand Down Expand Up @@ -133,12 +134,31 @@ def mock_director_v2_service_state(
yield None


@pytest.fixture(
params=[
False,
# NOTE: enable below when INTERNAL scheduler is implemented
# True,
GitHK marked this conversation as resolved.
Show resolved Hide resolved
]
)
def use_internal_scheduler(request: pytest.FixtureRequest) -> bool:
return request.param


@pytest.fixture
def app_environment(
app_environment: EnvVarsDict,
rabbit_service: RabbitSettings,
redis_service: RedisSettings,
use_internal_scheduler: bool,
monkeypatch: pytest.MonkeyPatch,
) -> EnvVarsDict:
setenvs_from_dict(
monkeypatch,
{
"DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER": f"{use_internal_scheduler}",
},
)
return app_environment


Expand Down
Loading