diff --git a/src/server/operandi_server/routers/workflow.py b/src/server/operandi_server/routers/workflow.py index f06ca74..6c2ce96 100644 --- a/src/server/operandi_server/routers/workflow.py +++ b/src/server/operandi_server/routers/workflow.py @@ -7,7 +7,7 @@ from tempfile import mkdtemp from typing import List -from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status, UploadFile +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status, UploadFile, Form from fastapi.responses import FileResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from starlette.status import HTTP_404_NOT_FOUND @@ -65,6 +65,12 @@ def __init__(self): response_model_exclude_none=True, summary="Upload a nextflow workflow script. Returns a `resource_id` associated with the uploaded script." ) + self.router.add_api_route( + path="/batch-workflows", endpoint=self.upload_batch_workflow_scripts, methods=["POST"], + status_code=status.HTTP_201_CREATED, + summary="Upload a list of nextflow workflow script (limit:5). Returns a list of `resource_id` associated with the uploaded script.", + response_model=None + ) self.router.add_api_route( path="/workflow/{workflow_id}", endpoint=self.submit_to_rabbitmq_queue, methods=["POST"], status_code=status.HTTP_201_CREATED, @@ -489,3 +495,56 @@ async def convert_txt_to_nextflow( await validate_oton_with_handling(self.logger, ocrd_process_txt) await convert_oton_with_handling(self.logger, ocrd_process_txt, nf_script_dest, environment, with_mets_server) return FileResponse(nf_script_dest, filename=f'{oton_id}.nf', media_type="application/txt-file") + + async def upload_batch_workflow_scripts( + self, + workflows: List[UploadFile], + auth: HTTPBasicCredentials = Depends(HTTPBasic()) + ) -> List[WorkflowRsrc]: + """ + Curl equivalent: + `curl -X POST SERVER_ADDR/batch-workflows \ + -F "workflows=@workflow1.nf" \ + -F "workflows=@workflow2.nf" \ + -F "workflows=@workflow3.nf"` + """ + py_user_action = await self.user_authenticator.user_login(auth) + + if len(workflows) > 5: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Batch submission limit exceeded. Maximum 5 workflows allowed." + ) + + workflow_resources = [] + for workflow in workflows: + try: + workflow_id, workflow_dir = create_resource_dir(SERVER_WORKFLOWS_ROUTER) + nf_script_dest = join(workflow_dir, workflow.filename) + + await receive_resource(file=workflow, resource_dst=nf_script_dest) + + uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest) + executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest) + + db_workflow = await db_create_workflow( + user_id=py_user_action.user_id, + workflow_id=workflow_id, + workflow_dir=workflow_dir, + workflow_script_path=nf_script_dest, + workflow_script_base=workflow.filename, + uses_mets_server=uses_mets_server, + executable_steps=executable_steps, + details=f"Batch uploaded workflow: {workflow.filename}" + ) + + workflow_resources.append(WorkflowRsrc.from_db_workflow(db_workflow)) + + except Exception as error: + self.logger.error(f"Failed to process workflow {workflow.filename}: {error}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Failed to process workflow {workflow.filename}: {error}" + ) + + return workflow_resources