Skip to content

Commit

Permalink
Merge branch 'main' into batch_endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT authored Dec 6, 2024
2 parents ccedf8c + b6b96de commit 25e4204
Show file tree
Hide file tree
Showing 29 changed files with 227 additions and 197 deletions.
69 changes: 29 additions & 40 deletions src/server/operandi_server/routers/admin_panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,22 @@

from operandi_server.models import PYUserInfo, WorkflowJobRsrc, WorkspaceRsrc, WorkflowRsrc
from operandi_utils.constants import AccountType, ServerApiTag
from operandi_utils.database import (
db_get_all_user_accounts, db_get_processing_stats, db_get_all_workflow_jobs_by_user,
db_get_workflow, db_get_workspace, db_get_all_workspaces_by_user, db_get_all_workflows_by_user
)
from operandi_utils.utils import send_bag_to_ola_hd
from .user import RouterUser
from .workspace_utils import create_workspace_bag, get_db_workspace_with_handling, validate_bag_with_handling
from operandi_utils.rabbitmq import get_connection_publisher
from .user_utils import get_user_accounts, get_user_processing_stats_with_handling, user_auth_with_handling
from .workflow_utils import get_user_workflows, get_user_workflow_jobs
from .workspace_utils import (
create_workspace_bag, get_user_workspaces, get_db_workspace_with_handling, validate_bag_with_handling
)

class RouterAdminPanel:
def __init__(self):
self.logger = getLogger("operandi_server.routers.user")
self.user_authenticator = RouterUser()
self.logger = getLogger("operandi_server.routers.admin_panel")

self.logger.info(f"Trying to connect RMQ Publisher")
self.rmq_publisher = get_connection_publisher(enable_acks=True)
self.logger.info(f"RMQPublisher connected")

self.router = APIRouter(tags=[ServerApiTag.ADMIN])
self.router.add_api_route(
path="/admin/users",
Expand All @@ -26,7 +30,7 @@ def __init__(self):
)
self.router.add_api_route(
path="/admin/processing_stats/{user_id}",
endpoint=self.get_processing_stats_for_user, methods=["GET"], status_code=status.HTTP_200_OK,
endpoint=self.user_processing_stats, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get processing stats for a specific user by user_id"
)
self.router.add_api_route(
Expand All @@ -50,8 +54,12 @@ def __init__(self):
summary="Push a workspace to Ola-HD service"
)

def __del__(self):
if self.rmq_publisher:
self.rmq_publisher.disconnect()

async def auth_admin_with_handling(self, auth: HTTPBasicCredentials):
py_user_action = await self.user_authenticator.user_login(auth)
py_user_action = await user_auth_with_handling(self.logger, auth)
if py_user_action.account_type != AccountType.ADMIN:
message = f"Admin privileges required for the endpoint"
self.logger.error(f"{message}")
Expand Down Expand Up @@ -81,60 +89,41 @@ async def push_to_ola_hd(self, workspace_id: str, auth: HTTPBasicCredentials = D
}
return response_message

async def get_users(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())):
async def get_users(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> List[PYUserInfo]:
await self.auth_admin_with_handling(auth)
users = await db_get_all_user_accounts()
return [PYUserInfo.from_db_user_account(user) for user in users]
return await get_user_accounts()

async def get_processing_stats_for_user(self, user_id: str, auth: HTTPBasicCredentials = Depends(HTTPBasic())):
async def user_processing_stats(self, user_id: str, auth: HTTPBasicCredentials = Depends(HTTPBasic())):
await self.auth_admin_with_handling(auth)
try:
db_processing_stats = await db_get_processing_stats(user_id)
if not db_processing_stats:
message = f"Processing stats not found for the user_id: {user_id}"
self.logger.error(message)
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
except Exception as error:
message = f"Failed to fetch processing stats for user_id: {user_id}, error: {error}"
self.logger.error(message)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=message)
return db_processing_stats
return await get_user_processing_stats_with_handling(self.logger, user_id=user_id)

async def user_workflow_jobs(
self, user_id: str, auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None, end_date: Optional[datetime] = None
) -> List:
) -> List[WorkflowJobRsrc]:
"""
The expected datetime format: YYYY-MM-DDTHH:MM:SS, for example, 2024-12-01T18:17:15
"""
await self.auth_admin_with_handling(auth)
db_workflow_jobs = await db_get_all_workflow_jobs_by_user(
user_id=user_id, start_date=start_date, end_date=end_date)
response = []
for db_workflow_job in db_workflow_jobs:
db_workflow = await db_get_workflow(db_workflow_job.workflow_id)
db_workspace = await db_get_workspace(db_workflow_job.workspace_id)
response.append(WorkflowJobRsrc.from_db_workflow_job(db_workflow_job, db_workflow, db_workspace))
return response
return await get_user_workflow_jobs(
self.logger, self.rmq_publisher, user_id, start_date, end_date)

async def user_workspaces(
self, user_id: str, auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None, end_date: Optional[datetime] = None
) -> List:
) -> List[WorkspaceRsrc]:
"""
The expected datetime format: YYYY-MM-DDTHH:MM:SS, for example, 2024-12-01T18:17:15
"""
await self.auth_admin_with_handling(auth)
db_workspaces = await db_get_all_workspaces_by_user(user_id=user_id, start_date=start_date, end_date=end_date)
return [WorkspaceRsrc.from_db_workspace(db_workspace) for db_workspace in db_workspaces]
return await get_user_workspaces(user_id=user_id, start_date=start_date, end_date=end_date)

async def user_workflows(
self, user_id: str, auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None, end_date: Optional[datetime] = None
) -> List:
) -> List[WorkflowRsrc]:
"""
The expected datetime format: YYYY-MM-DDTHH:MM:SS, for example, 2024-12-01T18:17:15
"""
await self.auth_admin_with_handling(auth)
db_workflows = await db_get_all_workflows_by_user(user_id=user_id, start_date=start_date, end_date=end_date)
return [WorkflowRsrc.from_db_workflow(db_workflow) for db_workflow in db_workflows]
return await get_user_workflows(user_id=user_id, start_date=start_date, end_date=end_date)
9 changes: 4 additions & 5 deletions src/server/operandi_server/routers/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
from operandi_utils.constants import ServerApiTag
from operandi_utils.oton.constants import OCRD_ALL_JSON
from operandi_server.models import PYDiscovery
from .user import RouterUser
from .user_utils import user_auth_with_handling


class RouterDiscovery:
def __init__(self):
self.logger = getLogger("operandi_server.routers.discovery")
self.user_authenticator = RouterUser()

self.router = APIRouter(tags=[ServerApiTag.DISCOVERY])
self.router.add_api_route(
Expand All @@ -39,7 +38,7 @@ def __init__(self):
)

async def discovery(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> PYDiscovery:
await self.user_authenticator.user_login(auth)
await user_auth_with_handling(self.logger, auth)
response = PYDiscovery(
ram=virtual_memory().total / (1024.0 ** 3),
cpu_cores=cpu_count(),
Expand All @@ -52,7 +51,7 @@ async def discovery(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) ->
return response

async def get_processor_names(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> List[str]:
await self.user_authenticator.user_login(auth)
await user_auth_with_handling(self.logger, auth)
try:
processor_names = list(OCRD_ALL_JSON.keys())
return processor_names
Expand All @@ -65,7 +64,7 @@ async def get_processor_names(self, auth: HTTPBasicCredentials = Depends(HTTPBas
raise HTTPException(status_code=500, detail="An unexpected error occurred while loading processor names.")

async def get_processor_info(self, processor_name: str, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> Dict:
await self.user_authenticator.user_login(auth)
await user_auth_with_handling(self.logger, auth)
try:
if processor_name not in OCRD_ALL_JSON:
raise HTTPException(status_code=404, detail=f"Processor '{processor_name}' not found.")
Expand Down
19 changes: 19 additions & 0 deletions src/server/operandi_server/routers/password_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from hashlib import sha512
from random import random
from typing import Tuple

def get_random_salt() -> str:
return sha512(f"{hash(str(random()))}".encode("utf-8")).hexdigest()[:8]

def get_hex_digest(salt: str, plain_password: str):
return sha512(f"{salt}{plain_password}".encode("utf-8")).hexdigest()

def encrypt_password(plain_password: str) -> Tuple[str, str]:
salt = get_random_salt()
hashed_password = get_hex_digest(salt, plain_password)
encrypted_password = f"{salt}${hashed_password}"
return salt, encrypted_password

def validate_password(plain_password: str, encrypted_password: str) -> bool:
salt, hashed_password = encrypted_password.split(sep='$', maxsplit=1)
return hashed_password == get_hex_digest(salt, plain_password)
76 changes: 28 additions & 48 deletions src/server/operandi_server/routers/user.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
from logging import getLogger
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import APIRouter, Depends, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials

from operandi_utils.constants import AccountType, ServerApiTag
from operandi_utils.database import (
db_get_processing_stats, db_get_all_workflow_jobs_by_user, db_get_user_account_with_email,
db_get_workflow, db_get_workspace, db_get_all_workspaces_by_user, db_get_all_workflows_by_user
)
from operandi_server.exceptions import AuthenticationError
from operandi_server.models import PYUserAction, WorkflowJobRsrc, WorkspaceRsrc, WorkflowRsrc
from operandi_utils.database.models import DBProcessingStatistics
from .user_utils import user_auth, user_register_with_handling
from operandi_utils.rabbitmq import get_connection_publisher
from .workflow_utils import get_user_workflows, get_user_workflow_jobs
from .workspace_utils import get_user_workspaces
from .user_utils import get_user_processing_stats_with_handling, user_auth_with_handling, user_register_with_handling


class RouterUser:
def __init__(self):
self.logger = getLogger("operandi_server.routers.user")

self.logger.info(f"Trying to connect RMQ Publisher")
self.rmq_publisher = get_connection_publisher(enable_acks=True)
self.logger.info(f"RMQPublisher connected")

self.router = APIRouter(tags=[ServerApiTag.USER])
self.router.add_api_route(
path="/user/register",
Expand Down Expand Up @@ -56,31 +59,23 @@ def __init__(self):
response_model=List, response_model_exclude_unset=True, response_model_exclude_none=True
)

def __del__(self):
if self.rmq_publisher:
self.rmq_publisher.disconnect()

async def user_login(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> PYUserAction:
"""
Used for user authentication.
"""
email = auth.username
password = auth.password
headers = {"WWW-Authenticate": "Basic"}
if not (email and password):
message = f"User login failed, missing e-mail or password field."
self.logger.error(f"{message}")
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, headers=headers, detail=message)
try:
db_user_account = await user_auth(email=email, password=password)
except AuthenticationError as error:
self.logger.error(f"{error}")
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, headers=headers, detail=str(error))
return PYUserAction.from_db_user_account(action="Successfully logged!", db_user_account=db_user_account)
return await user_auth_with_handling(logger=self.logger, auth=auth)

async def user_register(
self, email: str, password: str, institution_id: str, account_type: AccountType = AccountType.USER,
details: str = "User Account"
) -> PYUserAction:
"""
Used for registration.
There are 3 account types:
There are 4 account types:
1) ADMIN
2) USER
3) HARVESTER
Expand All @@ -101,51 +96,36 @@ async def user_register(
return PYUserAction.from_db_user_account(action=action, db_user_account=db_user_account)

async def user_processing_stats(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())):
await self.user_login(auth)
db_user_account = await db_get_user_account_with_email(email=auth.username)
db_processing_stats = await db_get_processing_stats(db_user_account.user_id)
return db_processing_stats
py_user_action = await user_auth_with_handling(self.logger, auth)
return await get_user_processing_stats_with_handling(self.logger, user_id=py_user_action.user_id)

async def user_workflow_jobs(
self, auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None, end_date: Optional[datetime] = None
) -> List:
) -> List[WorkflowJobRsrc]:
"""
The expected datetime format: YYYY-MM-DDTHH:MM:SS, for example, 2024-12-01T18:17:15
"""
await self.user_login(auth)
db_user_account = await db_get_user_account_with_email(email=auth.username)
db_workflow_jobs = await db_get_all_workflow_jobs_by_user(
user_id=db_user_account.user_id, start_date=start_date, end_date=end_date)
response = []
for db_workflow_job in db_workflow_jobs:
db_workflow = await db_get_workflow(db_workflow_job.workflow_id)
db_workspace = await db_get_workspace(db_workflow_job.workspace_id)
response.append(WorkflowJobRsrc.from_db_workflow_job(db_workflow_job, db_workflow, db_workspace))
return response
py_user_action = await user_auth_with_handling(self.logger, auth)
return await get_user_workflow_jobs(
self.logger, self.rmq_publisher, py_user_action.user_id, start_date, end_date)

async def user_workspaces(
self, auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None, end_date: Optional[datetime] = None
) -> List:
) -> List[WorkspaceRsrc]:
"""
The expected datetime format: YYYY-MM-DDTHH:MM:SS, for example, 2024-12-01T18:17:15
"""
await self.user_login(auth)
db_user_account = await db_get_user_account_with_email(email=auth.username)
db_workspaces = await db_get_all_workspaces_by_user(
user_id=db_user_account.user_id, start_date=start_date, end_date=end_date)
return [WorkspaceRsrc.from_db_workspace(db_workspace) for db_workspace in db_workspaces]
py_user_action = await user_auth_with_handling(self.logger, auth)
return await get_user_workspaces(user_id=py_user_action.user_id, start_date=start_date, end_date=end_date)

async def user_workflows(
self, auth: HTTPBasicCredentials = Depends(HTTPBasic()),
start_date: Optional[datetime] = None, end_date: Optional[datetime] = None
) -> List:
) -> List[WorkflowRsrc]:
"""
The expected datetime format: YYYY-MM-DDTHH:MM:SS, for example, 2024-12-01T18:17:15
"""
await self.user_login(auth)
db_user_account = await db_get_user_account_with_email(email=auth.username)
db_workflows = await db_get_all_workflows_by_user(
user_id=db_user_account.user_id, start_date=start_date, end_date=end_date)
return [WorkflowRsrc.from_db_workflow(db_workflow) for db_workflow in db_workflows]
py_user_action = await user_auth_with_handling(self.logger, auth)
return await get_user_workflows(user_id=py_user_action.user_id, start_date=start_date, end_date=end_date)
Loading

0 comments on commit 25e4204

Please sign in to comment.