Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement user check and start of permissions #683

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ OIDC_AUTHORITY=
OIDC_CLIENT_ID=
OIDC_USERNAME_PROPERTY=
OIDC_CERTS_URL=
OIDC_TOKEN_ENDPOINT=
USE_OIDC_ROLES=
OIDC_ROLES_PROPERTY=
JWT_ALGORITHM=
JWT_SECRET=
JWT_AUDIENCE=
Expand Down
48 changes: 46 additions & 2 deletions api/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,47 @@
# from typing import Annotated
from typing import Annotated, Optional, List
from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from decouple import config
from db.db_connection import get_db
from auth.auth_handler import decode_token
from services.user import UserService

# from fastapi import Header, HTTPException

OIDC_TOKEN_ENDPOINT = config("OIDC_TOKEN_ENDPOINT")
OIDC_USERNAME_PROPERTY = config("OIDC_USERNAME_PROPERTY")
USE_OIDC_ROLES = config("USE_OIDC_ROLES")
OIDC_ROLES_PROPERTY = config("OIDC_ROLES_PROPERTY")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl=OIDC_TOKEN_ENDPOINT)


class AppUser(BaseModel):
id: Optional[int]
username: Optional[str]
email: Optional[str]
first_name: Optional[str]
last_name: Optional[str]
roles: Optional[List[str]]


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], db: Session = Depends(get_db)):
decoded = decode_token(token)
user_in_db = UserService(db).get_user(decoded[OIDC_USERNAME_PROPERTY])
if not user_in_db:
raise HTTPException(status_code=401, detail="You are not an authorized user.")
user = AppUser(
id=user_in_db.id,
email=decoded["email"],
first_name=decoded["given_name"],
last_name=decoded["family_name"],
roles=[],
)
if USE_OIDC_ROLES:
user.roles = decoded[OIDC_ROLES_PROPERTY].copy()
else:
user_roles = UserService(db).get_user_roles(user.id)
for user_role in user_roles:
user.roles.append(user_role.role.name)
return user
13 changes: 8 additions & 5 deletions api/models/user.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from sqlalchemy import Date, Column, ForeignKey, Integer, String, Numeric, UniqueConstraint, CheckConstraint

from sqlalchemy.orm import relationship
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from db.base_class import Base


Expand All @@ -18,11 +20,12 @@ class UserGroup(Base):
name = Column(String(length=128), nullable=True, unique=True)


class UserPermissions(Base):
class UserRoles(Base):
__tablename__ = "belongs"

group_id = Column("user_groupid", ForeignKey("user_group.id"), nullable=False, primary_key=True)
user_id = Column("usrid", Integer, ForeignKey("usr.id"), nullable=False, primary_key=True)
group_id: Mapped[int] = mapped_column("user_groupid", ForeignKey("user_group.id"), nullable=False, primary_key=True)
user_id: Mapped[int] = mapped_column("usrid", ForeignKey("usr.id"), nullable=False, primary_key=True)
role: Mapped["UserGroup"] = relationship()
user: Mapped["User"] = relationship()


class UserLocation(Base):
Expand Down
34 changes: 26 additions & 8 deletions api/routers/v1/timelog.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import List
from datetime import date
from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, Query, HTTPException
from sqlalchemy.orm import Session

from schemas.timelog import (
Expand All @@ -13,24 +13,34 @@

from db.db_connection import get_db
from auth.auth_bearer import BearerToken
from dependencies import get_current_user

router = APIRouter(prefix="/timelog", tags=["timelog"], responses={"404": {"description": "Not found"}})
router = APIRouter(
prefix="/timelog",
tags=["timelog"],
responses={403: {"description": "Forbidden"}, 404: {"description": "Not found"}},
dependencies=[Depends(BearerToken())],
)


@router.get("/task_types/", dependencies=[Depends(BearerToken())], response_model=List[TaskTypeItem])
@router.get("/task_types/", response_model=List[TaskTypeItem])
async def get_task_types(db: Session = Depends(get_db), skip: int = 0, limit: int = 100):
items = TaskTypeService(db).get_items()
return items


@router.get("/templates/{user_id}", dependencies=[Depends(BearerToken())], response_model=List[TemplateSchema])
async def get_user_templates(user_id: int, db: Session = Depends(get_db)):
@router.get("/templates", response_model=List[TemplateSchema])
async def get_user_templates(user_id: int, current_user=Depends(get_current_user), db: Session = Depends(get_db)):
if user_id != current_user.id:
raise HTTPException(status_code=403, detail="You are not authorized to see templates for this user")
templates = TemplateService(db).get_user_templates(user_id)
return templates


@router.post("/templates", dependencies=[Depends(BearerToken())], response_model=TemplateSchema, status_code=201)
async def add_template(template: TemplateNewSchema, db: Session = Depends(get_db)):
@router.post("/templates", response_model=TemplateSchema, status_code=201)
async def add_template(
template: TemplateNewSchema, current_user=Depends(get_current_user), db: Session = Depends(get_db)
):
"""
Create a template with all the information:

Expand All @@ -46,18 +56,26 @@ async def add_template(template: TemplateNewSchema, db: Session = Depends(get_db
\f
:param item: User input.
"""
if template.is_global and "manager" not in current_user.roles:
raise HTTPException(status_code=403, detail="You are not authorized to create global templates")
else:
if template.user_id and template.user_id != current_user.id:
raise HTTPException(status_code=403, detail="You are not authorized to create templates for this user")
result = TemplateService(db).create_template(template)
return result


@router.get("/tasks", dependencies=[Depends(BearerToken())], response_model=List[TaskSchema])
@router.get("/tasks", response_model=List[TaskSchema])
async def get_user_tasks(
user_id: int,
current_user=Depends(get_current_user),
db: Session = Depends(get_db),
offset: int = Query(0, ge=0),
limit: int = Query(25, ge=0, le=500),
start: date = date.today(),
end: date = date.today(),
):
if current_user.id != user_id and "manager" not in current_user.roles:
raise HTTPException(status_code=403, detail="You are not authorized to view tasks for this user")
tasks = TaskService(db).get_user_tasks(user_id, offset, limit, start, end)
return tasks
24 changes: 24 additions & 0 deletions api/schemas/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from pydantic import BaseModel


class User(BaseModel):
id: int
login: str

class Config:
orm_mode = True


class UserGroup(BaseModel):
id: int
name: str

class Config:
orm_mode = True


class UserRoles(BaseModel):
role: UserGroup

class Config:
orm_mode = True
18 changes: 18 additions & 0 deletions api/services/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import List

from services.main import AppService
from models.user import User, UserRoles


class UserService(AppService):
def get_users(self) -> List[User]:
users = self.db.query(User).all() or []
return users

def get_user(self, username: str) -> User:
user = self.db.query(User).filter(User.login == username).first() or None
return user

def get_user_roles(self, user_id: int) -> List[UserRoles]:
roles = self.db.query(UserRoles).filter(UserRoles.user_id == user_id).all() or []
return roles