Skip to content

Commit

Permalink
migrate to new long running task
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Aug 30, 2022
1 parent 82816e3 commit 13afd5b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from functools import wraps
from typing import Any

from aiohttp import web
from models_library.users import UserID
Expand All @@ -8,14 +7,7 @@
from servicelib.aiohttp.long_running_tasks._server import (
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
)
from servicelib.aiohttp.long_running_tasks.server import (
TaskGet,
TaskProtocol,
create_task_name_from_request,
get_tasks_manager,
setup,
start_task,
)
from servicelib.aiohttp.long_running_tasks.server import setup
from servicelib.aiohttp.typing_extension import Handler

from ._constants import RQ_PRODUCT_KEY
Expand All @@ -28,31 +20,6 @@ class _RequestContext(BaseModel):
product_name: str = Field(..., alias=RQ_PRODUCT_KEY)


def start_task_with_context(
request: web.Request, task: TaskProtocol, **task_kwargs: Any
) -> TaskGet:
req_ctx = _RequestContext.parse_obj(request)
task_manager = get_tasks_manager(request.app)
task_name = create_task_name_from_request(request)
task_id = start_task(
task_manager,
task,
task_context=jsonable_encoder(req_ctx),
task_name=task_name,
**task_kwargs,
)
status_url = request.app.router["get_task_status"].url_for(task_id=task_id)
result_url = request.app.router["get_task_result"].url_for(task_id=task_id)
abort_url = request.app.router["cancel_and_delete_task"].url_for(task_id=task_id)
return TaskGet(
task_id=task_id,
task_name=task_name,
status_href=f"{status_url}",
result_href=f"{result_url}",
abort_href=f"{abort_url}",
)


def _webserver_request_context_decorator(handler: Handler):
@wraps(handler)
async def _test_task_context_decorator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
from models_library.utils.fastapi_encoders import jsonable_encoder
from pydantic import BaseModel, Extra, Field, NonNegativeInt, parse_obj_as
from servicelib.aiohttp.long_running_tasks.server import (
TaskGet,
TaskProgress,
get_tasks_manager,
start_long_running_task,
)
from servicelib.aiohttp.requests_validation import (
parse_request_path_parameters_as,
Expand All @@ -39,7 +38,6 @@
from .._meta import api_version_prefix as VTAG
from ..application_settings import get_settings
from ..login.decorators import RQT_USERID_KEY, login_required
from ..long_running_tasks import start_task_with_context
from ..resource_manager.websocket_manager import PROJECT_ID_KEY, managed_resource
from ..rest_constants import RESPONSE_MODEL_POLICY
from ..security_api import check_permission
Expand Down Expand Up @@ -132,30 +130,15 @@ async def create_projects(request: web.Request):
if query_params.as_template: # create template from
await check_permission(request, "project.template.create")

task_get: Optional[TaskGet] = None
try:
task_get = start_task_with_context(
request,
_create_projects,
app=request.app,
query_params=query_params,
user_id=req_ctx.user_id,
predefined_project=predefined_project,
)

return web.json_response(
data={"data": task_get},
status=web.HTTPAccepted.status_code,
dumps=json_dumps,
)
except asyncio.CancelledError:
# cancel the task, the client has disconnected
if task_get:
task_manager = get_tasks_manager(request.app)
await task_manager.cancel_task(
task_get.task_id, with_task_context=jsonable_encoder(req_ctx)
)
raise
return start_long_running_task(
request,
_create_projects,
task_context=jsonable_encoder(req_ctx),
app=request.app,
query_params=query_params,
user_id=req_ctx.user_id,
predefined_project=predefined_project,
)


async def _init_project_from_request(
Expand Down

0 comments on commit 13afd5b

Please sign in to comment.