Skip to content

Commit

Permalink
Implement Endpoints for User and Admin Workflow Management (#21)
Browse files Browse the repository at this point in the history
* feat: add endpoint GET /user/workflow_jobs to fetch workflow_job_ids for specific user

* fix: correct typing for return value in db_get_all_job_ids_by_user

* Update: GET /user/workflow_jobs - that returns all workflow jobs submitted by that specific user.

* Update: CHange functions signature.

* Fix: CIrcular IMport Error

* fix circular import

* Feat: Add start_date and end_date for getting jobs in selected date range.

* Add: GET /user/worspaces - that returns all workspaces submitted by that specific user.

* Update: Remove debugging code

* Add: GET /user/workflows - that returns all workflows submitted by that specific user.

* Add: GET /admin/{user_id}/workflow_jobs - admin can list all workflow jobs for a specific user identified with user_id

* Add: GET /admin/{user_id}/workspaces - admin can list all workspaces for a specific user identified with user_id

* Add: GET /admin/{user_id}/workflows - admin can list all workflows for a specific user identified with user_id

---------

Co-authored-by: Mehmed Mustafa <[email protected]>
  • Loading branch information
Faizan-hub and MehmedGIT authored Dec 2, 2024
1 parent 5ae5db8 commit a6b4842
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 9 deletions.
98 changes: 94 additions & 4 deletions src/server/operandi_server/routers/admin_panel.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from logging import getLogger
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials

from operandi_server.models import PYUserInfo
from operandi_server.models import PYUserInfo, WorkflowJobRsrc, WorkspaceRsrc, WorkflowRsrc
from operandi_utils.constants import AccountType, ServerApiTag
from operandi_utils.database import db_get_all_user_accounts, db_get_processing_stats
from operandi_utils.database import (
db_get_all_user_accounts, db_get_processing_stats, db_get_all_jobs_by_user,
db_get_workflow, db_get_workspace, db_get_all_workspaces_by_user, db_get_all_workflows_by_user

)
from operandi_utils.utils import send_bag_to_ola_hd
from .user import RouterUser
from .workspace_utils import create_workspace_bag, get_db_workspace_with_handling, validate_bag_with_handling
Expand All @@ -30,7 +36,21 @@ def __init__(self):
endpoint=self.get_processing_stats_for_user, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get processing stats for a specific user by user_id"
)

self.router.add_api_route(
path="/admin/{user_id}/workflow_jobs",
endpoint=self.user_workflow_jobs, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workflow jobs submitted by the user identified by user_id"
)
self.router.add_api_route(
path="/admin/{user_id}/workspaces",
endpoint=self.user_workspaces, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workspaces submitted by the user identified by user_id"
)
self.router.add_api_route(
path="/admin/{user_id}/workflows",
endpoint=self.user_workflows, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workflows submitted by the user identified by user_id"
)
async def push_to_ola_hd(self, workspace_id: str, auth: HTTPBasicCredentials = Depends(HTTPBasic())):
py_user_action = await self.user_authenticator.user_login(auth)
if py_user_action.account_type != AccountType.ADMIN:
Expand Down Expand Up @@ -91,4 +111,74 @@ async def get_processing_stats_for_user(self, user_id: str, auth: HTTPBasicCrede
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=message)

# Return the processing stats in the response model
return db_processing_stats
return db_processing_stats
async def user_workflow_jobs(
self,
user_id: str,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:
# Authenticate the admin user
py_user_action = await self.user_authenticator.user_login(auth)
if py_user_action.account_type != AccountType.ADMIN:
message = f"Admin privileges required for the endpoint"
self.logger.error(f"{message}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message)
# Retrieve workflow jobs for the user identified with user_id with optional date filtering
db_workflow_jobs = await db_get_all_jobs_by_user(
user_id=user_id,
start_date=start_date,
end_date=end_date
)
response = []
for db_workflow_job in db_workflow_jobs:
db_workflow = await db_get_workflow(db_workflow_job.workflow_id)
db_workspace = await db_get_workspace(db_workflow_job.workspace_id)
response.append(WorkflowJobRsrc.from_db_workflow_job(db_workflow_job, db_workflow, db_workspace))
return response


async def user_workspaces(
self,
user_id: str,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:
# Authenticate the admin user
py_user_action = await self.user_authenticator.user_login(auth)
if py_user_action.account_type != AccountType.ADMIN:
message = f"Admin privileges required for the endpoint"
self.logger.error(f"{message}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message)
# Retrieve workspaces for the user with optional date filtering
db_workspaces = await db_get_all_workspaces_by_user(
user_id=user_id,
start_date=start_date,
end_date=end_date
)

return [WorkspaceRsrc.from_db_workspace(db_workspace) for db_workspace in db_workspaces]

async def user_workflows(
self,
user_id: str,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:
# Authenticate the admin user
py_user_action = await self.user_authenticator.user_login(auth)
if py_user_action.account_type != AccountType.ADMIN:
message = f"Admin privileges required for the endpoint"
self.logger.error(f"{message}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message)
# Retrieve workspaces for the user with optional date filtering
db_workflows = await db_get_all_workflows_by_user(
user_id=user_id,
start_date=start_date,
end_date=end_date
)

return [WorkflowRsrc.from_db_workflow(db_workflow) for db_workflow in db_workflows]
89 changes: 87 additions & 2 deletions src/server/operandi_server/routers/user.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from logging import getLogger
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials

from operandi_utils.constants import AccountType, ServerApiTag
from operandi_utils.database import db_get_processing_stats, db_get_user_account_with_email
from operandi_utils.database import (
db_get_processing_stats, db_get_all_jobs_by_user, db_get_user_account_with_email,
db_get_workflow, db_get_workspace, db_get_all_workspaces_by_user, db_get_all_workflows_by_user
)
from operandi_server.exceptions import AuthenticationError
from operandi_server.models import PYUserAction
from operandi_server.models import PYUserAction, WorkflowJobRsrc, WorkspaceRsrc, WorkflowRsrc
from operandi_utils.database.models import DBProcessingStatistics
from .user_utils import user_auth, user_register_with_handling

Expand All @@ -32,6 +37,25 @@ def __init__(self):
summary="Get user account statistics of the current account",
response_model=DBProcessingStatistics, response_model_exclude_unset=True, response_model_exclude_none=True
)
self.router.add_api_route(
path="/user/workflow_jobs",
endpoint=self.user_workflow_jobs, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workflow jobs submitted by the user",
response_model=List, response_model_exclude_unset=True, response_model_exclude_none=True
)
self.router.add_api_route(
path="/user/workspaces",
endpoint=self.user_workspaces, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workspaces submitted by the user",
response_model=List, response_model_exclude_unset=True, response_model_exclude_none=True
)
self.router.add_api_route(
path="/user/workflows",
endpoint=self.user_workflows, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workflows submitted by the user",
response_model=List, response_model_exclude_unset=True, response_model_exclude_none=True
)


async def user_login(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> PYUserAction:
"""
Expand Down Expand Up @@ -82,3 +106,64 @@ async def user_processing_stats(self, auth: HTTPBasicCredentials = Depends(HTTPB
db_user_account = await db_get_user_account_with_email(email=auth.username)
db_processing_stats = await db_get_processing_stats(db_user_account.user_id)
return db_processing_stats

async def user_workflow_jobs(
self,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:
await self.user_login(auth)
# Fetch user account details
db_user_account = await db_get_user_account_with_email(email=auth.username)
# Retrieve workflow jobs for the user with optional date filtering
db_workflow_jobs = await db_get_all_jobs_by_user(
user_id=db_user_account.user_id,
start_date=start_date,
end_date=end_date
)
response = []
for db_workflow_job in db_workflow_jobs:
db_workflow = await db_get_workflow(db_workflow_job.workflow_id)
db_workspace = await db_get_workspace(db_workflow_job.workspace_id)
response.append(WorkflowJobRsrc.from_db_workflow_job(db_workflow_job, db_workflow, db_workspace))
return response


async def user_workspaces(
self,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:

await self.user_login(auth)
# Fetch user account details
db_user_account = await db_get_user_account_with_email(email=auth.username)
# Retrieve workspaces for the user with optional date filtering
db_workspaces = await db_get_all_workspaces_by_user(
user_id=db_user_account.user_id,
start_date=start_date,
end_date=end_date
)

return [WorkspaceRsrc.from_db_workspace(db_workspace) for db_workspace in db_workspaces]

async def user_workflows(
self,
auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List:

await self.user_login(auth)
# Fetch user account details
db_user_account = await db_get_user_account_with_email(email=auth.username)
# Retrieve workflow for the user with optional date filtering
db_workflows = await db_get_all_workflows_by_user(
user_id=db_user_account.user_id,
start_date=start_date,
end_date=end_date
)

return [WorkflowRsrc.from_db_workflow(db_workflow) for db_workflow in db_workflows]
14 changes: 13 additions & 1 deletion src/utils/operandi_utils/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
"db_get_user_account",
"db_get_user_account_with_email",
"db_get_workflow",
"db_get_all_workflows_by_user",
"db_get_workflow_job",
"db_get_all_jobs_by_user",
"db_get_workspace",
"db_get_all_workspaces_by_user",
"db_increase_processing_stats",
"db_increase_processing_stats_with_handling",
"db_initiate_database",
Expand All @@ -38,8 +41,11 @@
"sync_db_get_user_account",
"sync_db_get_user_account_with_email",
"sync_db_get_workflow",
"sync_db_get_all_workflows_by_user",
"sync_db_get_workflow_job",
"sync_db_get_all_jobs_by_user",
"sync_db_get_workspace",
"sync_db_get_all_workspaces_by_user",
"sync_db_increase_processing_stats",
"sync_db_initiate_database",
"sync_db_update_hpc_slurm_job",
Expand Down Expand Up @@ -74,26 +80,32 @@
from .db_workflow import (
db_create_workflow,
db_get_workflow,
db_get_all_workflows_by_user,
db_update_workflow,
sync_db_create_workflow,
sync_db_get_workflow,
sync_db_get_all_workflows_by_user,
sync_db_update_workflow
)
from .db_workflow_job import (
db_create_workflow_job,
db_get_workflow_job,
db_get_all_jobs_by_user,
db_update_workflow_job,
sync_db_create_workflow_job,
sync_db_get_workflow_job,
sync_db_get_all_jobs_by_user,
sync_db_update_workflow_job
)
from .db_workspace import (
db_create_workspace,
db_get_workspace,
db_get_all_workspaces_by_user,
db_update_workspace,
sync_db_create_workspace,
sync_db_get_workspace,
sync_db_update_workspace
sync_db_update_workspace,
sync_db_get_all_workspaces_by_user
)
from .db_processing_statistics import (
db_create_processing_stats,
Expand Down
21 changes: 21 additions & 0 deletions src/utils/operandi_utils/database/db_workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from typing import List, Optional
from operandi_utils import call_sync
from .models import DBWorkflow

Expand Down Expand Up @@ -48,6 +49,22 @@ async def db_get_workflow(workflow_id: str) -> DBWorkflow:
raise RuntimeError(f"No DB workflow entry found for id: {workflow_id}")
return db_workflow

async def db_get_all_workflows_by_user(user_id: str, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> List[DBWorkflow]:
# Start with the user_id filter
query = {"user_id": user_id}

# Add date filters conditionally
if start_date or end_date:
query["datetime"] = {}
if start_date:
query["datetime"]["$gte"] = start_date
if end_date:
query["datetime"]["$lte"] = end_date

# Execute the query
db_workflows = await DBWorkflow.find_many(query).to_list()
return db_workflows

@call_sync
async def sync_db_get_workflow(workflow_id: str) -> DBWorkflow:
Expand Down Expand Up @@ -83,3 +100,7 @@ async def db_update_workflow(find_workflow_id: str, **kwargs) -> DBWorkflow:
@call_sync
async def sync_db_update_workflow(find_workflow_id: str, **kwargs) -> DBWorkflow:
return await db_update_workflow(find_workflow_id=find_workflow_id, **kwargs)

@call_sync
async def sync_db_get_all_workflows_by_user(user_id: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> List[DBWorkflow]:
return await db_get_all_workflows_by_user(user_id, start_date, end_date)
25 changes: 24 additions & 1 deletion src/utils/operandi_utils/database/db_workflow_job.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import datetime
from typing import List, Optional
from operandi_utils import call_sync
from operandi_utils.constants import StateJob
from .models import DBWorkflowJob
from operandi_utils.database.models import DBWorkflowJob


async def db_create_workflow_job(
Expand Down Expand Up @@ -37,6 +38,24 @@ async def db_get_workflow_job(job_id: str) -> DBWorkflowJob:
return db_workflow_job


async def db_get_all_jobs_by_user(user_id: str, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> List[DBWorkflowJob]:
# Start with the user_id filter
query = {"user_id": user_id}

# Add date filters conditionally
if start_date or end_date:
query["datetime"] = {}
if start_date:
query["datetime"]["$gte"] = start_date
if end_date:
query["datetime"]["$lte"] = end_date

# Execute the query
db_workflow_jobs = await DBWorkflowJob.find_many(query).to_list()
return db_workflow_jobs


@call_sync
async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob:
return await db_get_workflow_job(job_id)
Expand Down Expand Up @@ -77,3 +96,7 @@ async def db_update_workflow_job(find_job_id: str, **kwargs) -> DBWorkflowJob:
@call_sync
async def sync_db_update_workflow_job(find_job_id: str, **kwargs) -> DBWorkflowJob:
return await db_update_workflow_job(find_job_id=find_job_id, **kwargs)

@call_sync
async def sync_db_get_all_jobs_by_user(user_id: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> List[DBWorkflowJob]:
return await db_get_all_jobs_by_user(user_id, start_date, end_date)
Loading

0 comments on commit a6b4842

Please sign in to comment.