From 3cdfacf2e14da516d373c8af615b8855dceecf79 Mon Sep 17 00:00:00 2001 From: Danielle Mayabb Date: Wed, 27 Sep 2023 08:33:11 -0700 Subject: [PATCH] Implement user check and start of permissions - Add get_current_user to dependencies that introspects token and builds user info based on username in token - Add config values to indicate use of OIDC roles (passed in token) or roles in local db - Add the user dependency to template endpoints and check user id against logged in user - Update user models and schema as needed - Add user service to get users and user roles --- .env.example | 3 +++ api/dependencies.py | 48 +++++++++++++++++++++++++++++++++++++-- api/models/user.py | 13 +++++++---- api/routers/v1/timelog.py | 34 ++++++++++++++++++++------- api/schemas/user.py | 24 ++++++++++++++++++++ api/services/user.py | 18 +++++++++++++++ 6 files changed, 125 insertions(+), 15 deletions(-) create mode 100644 api/schemas/user.py create mode 100644 api/services/user.py diff --git a/.env.example b/.env.example index 2169059e9..a49a1d06f 100644 --- a/.env.example +++ b/.env.example @@ -25,6 +25,9 @@ OIDC_AUTHORITY= OIDC_CLIENT_ID= OIDC_USERNAME_PROPERTY= OIDC_CERTS_URL= +OIDC_TOKEN_ENDPOINT= +USE_OIDC_ROLES= +OIDC_ROLES_PROPERTY= JWT_ALGORITHM= JWT_SECRET= JWT_AUDIENCE= diff --git a/api/dependencies.py b/api/dependencies.py index 218ddc6f6..3e74df489 100644 --- a/api/dependencies.py +++ b/api/dependencies.py @@ -1,3 +1,47 @@ -# from typing import Annotated +from typing import Annotated, Optional, List +from fastapi import Depends, HTTPException +from sqlalchemy.orm import Session +from fastapi.security import OAuth2PasswordBearer +from pydantic import BaseModel +from decouple import config +from db.db_connection import get_db +from auth.auth_handler import decode_token +from services.user import UserService -# from fastapi import Header, HTTPException + +OIDC_TOKEN_ENDPOINT = config("OIDC_TOKEN_ENDPOINT") +OIDC_USERNAME_PROPERTY = config("OIDC_USERNAME_PROPERTY") +USE_OIDC_ROLES = config("USE_OIDC_ROLES") +OIDC_ROLES_PROPERTY = config("OIDC_ROLES_PROPERTY") + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl=OIDC_TOKEN_ENDPOINT) + + +class AppUser(BaseModel): + id: Optional[int] + username: Optional[str] + email: Optional[str] + first_name: Optional[str] + last_name: Optional[str] + roles: Optional[List[str]] + + +async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], db: Session = Depends(get_db)): + decoded = decode_token(token) + user_in_db = UserService(db).get_user(decoded[OIDC_USERNAME_PROPERTY]) + if not user_in_db: + raise HTTPException(status_code=401, detail="You are not an authorized user.") + user = AppUser( + id=user_in_db.id, + email=decoded["email"], + first_name=decoded["given_name"], + last_name=decoded["family_name"], + roles=[], + ) + if USE_OIDC_ROLES: + user.roles = decoded[OIDC_ROLES_PROPERTY].copy() + else: + user_roles = UserService(db).get_user_roles(user.id) + for user_role in user_roles: + user.roles.append(user_role.role.name) + return user diff --git a/api/models/user.py b/api/models/user.py index e6ce42d7c..e38812418 100644 --- a/api/models/user.py +++ b/api/models/user.py @@ -1,5 +1,7 @@ from sqlalchemy import Date, Column, ForeignKey, Integer, String, Numeric, UniqueConstraint, CheckConstraint - +from sqlalchemy.orm import relationship +from sqlalchemy.orm import Mapped +from sqlalchemy.orm import mapped_column from db.base_class import Base @@ -18,11 +20,12 @@ class UserGroup(Base): name = Column(String(length=128), nullable=True, unique=True) -class UserPermissions(Base): +class UserRoles(Base): __tablename__ = "belongs" - - group_id = Column("user_groupid", ForeignKey("user_group.id"), nullable=False, primary_key=True) - user_id = Column("usrid", Integer, ForeignKey("usr.id"), nullable=False, primary_key=True) + group_id: Mapped[int] = mapped_column("user_groupid", ForeignKey("user_group.id"), nullable=False, primary_key=True) + user_id: Mapped[int] = mapped_column("usrid", ForeignKey("usr.id"), nullable=False, primary_key=True) + role: Mapped["UserGroup"] = relationship() + user: Mapped["User"] = relationship() class UserLocation(Base): diff --git a/api/routers/v1/timelog.py b/api/routers/v1/timelog.py index 61ef3fc35..6702496c8 100644 --- a/api/routers/v1/timelog.py +++ b/api/routers/v1/timelog.py @@ -1,6 +1,6 @@ from typing import List from datetime import date -from fastapi import APIRouter, Depends, Query +from fastapi import APIRouter, Depends, Query, HTTPException from sqlalchemy.orm import Session from schemas.timelog import ( @@ -13,24 +13,34 @@ from db.db_connection import get_db from auth.auth_bearer import BearerToken +from dependencies import get_current_user -router = APIRouter(prefix="/timelog", tags=["timelog"], responses={"404": {"description": "Not found"}}) +router = APIRouter( + prefix="/timelog", + tags=["timelog"], + responses={403: {"description": "Forbidden"}, 404: {"description": "Not found"}}, + dependencies=[Depends(BearerToken())], +) -@router.get("/task_types/", dependencies=[Depends(BearerToken())], response_model=List[TaskTypeItem]) +@router.get("/task_types/", response_model=List[TaskTypeItem]) async def get_task_types(db: Session = Depends(get_db), skip: int = 0, limit: int = 100): items = TaskTypeService(db).get_items() return items -@router.get("/templates/{user_id}", dependencies=[Depends(BearerToken())], response_model=List[TemplateSchema]) -async def get_user_templates(user_id: int, db: Session = Depends(get_db)): +@router.get("/templates", response_model=List[TemplateSchema]) +async def get_user_templates(user_id: int, current_user=Depends(get_current_user), db: Session = Depends(get_db)): + if user_id != current_user.id: + raise HTTPException(status_code=403, detail="You are not authorized to see templates for this user") templates = TemplateService(db).get_user_templates(user_id) return templates -@router.post("/templates", dependencies=[Depends(BearerToken())], response_model=TemplateSchema, status_code=201) -async def add_template(template: TemplateNewSchema, db: Session = Depends(get_db)): +@router.post("/templates", response_model=TemplateSchema, status_code=201) +async def add_template( + template: TemplateNewSchema, current_user=Depends(get_current_user), db: Session = Depends(get_db) +): """ Create a template with all the information: @@ -46,18 +56,26 @@ async def add_template(template: TemplateNewSchema, db: Session = Depends(get_db \f :param item: User input. """ + if template.is_global and "manager" not in current_user.roles: + raise HTTPException(status_code=403, detail="You are not authorized to create global templates") + else: + if template.user_id and template.user_id != current_user.id: + raise HTTPException(status_code=403, detail="You are not authorized to create templates for this user") result = TemplateService(db).create_template(template) return result -@router.get("/tasks", dependencies=[Depends(BearerToken())], response_model=List[TaskSchema]) +@router.get("/tasks", response_model=List[TaskSchema]) async def get_user_tasks( user_id: int, + current_user=Depends(get_current_user), db: Session = Depends(get_db), offset: int = Query(0, ge=0), limit: int = Query(25, ge=0, le=500), start: date = date.today(), end: date = date.today(), ): + if current_user.id != user_id and "manager" not in current_user.roles: + raise HTTPException(status_code=403, detail="You are not authorized to view tasks for this user") tasks = TaskService(db).get_user_tasks(user_id, offset, limit, start, end) return tasks diff --git a/api/schemas/user.py b/api/schemas/user.py new file mode 100644 index 000000000..e44dc6c45 --- /dev/null +++ b/api/schemas/user.py @@ -0,0 +1,24 @@ +from pydantic import BaseModel + + +class User(BaseModel): + id: int + login: str + + class Config: + orm_mode = True + + +class UserGroup(BaseModel): + id: int + name: str + + class Config: + orm_mode = True + + +class UserRoles(BaseModel): + role: UserGroup + + class Config: + orm_mode = True diff --git a/api/services/user.py b/api/services/user.py new file mode 100644 index 000000000..c79b8ee2a --- /dev/null +++ b/api/services/user.py @@ -0,0 +1,18 @@ +from typing import List + +from services.main import AppService +from models.user import User, UserRoles + + +class UserService(AppService): + def get_users(self) -> List[User]: + users = self.db.query(User).all() or [] + return users + + def get_user(self, username: str) -> User: + user = self.db.query(User).filter(User.login == username).first() or None + return user + + def get_user_roles(self, user_id: int) -> List[UserRoles]: + roles = self.db.query(UserRoles).filter(UserRoles.user_id == user_id).all() or [] + return roles