-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement user check and start of permissions
- Add get_current_user to dependencies that introspects token and builds user info based on username in token - Add config values to indicate use of OIDC roles (passed in token) or roles in local db - Add the user dependency to template endpoints and check user id against logged in user - Update user models and schema as needed - Add user service to get users and user roles
- Loading branch information
Showing
6 changed files
with
125 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,47 @@ | ||
# from typing import Annotated | ||
from typing import Annotated, Optional, List | ||
from fastapi import Depends, HTTPException | ||
from sqlalchemy.orm import Session | ||
from fastapi.security import OAuth2PasswordBearer | ||
from pydantic import BaseModel | ||
from decouple import config | ||
from db.db_connection import get_db | ||
from auth.auth_handler import decode_token | ||
from services.user import UserService | ||
|
||
# from fastapi import Header, HTTPException | ||
|
||
OIDC_TOKEN_ENDPOINT = config("OIDC_TOKEN_ENDPOINT") | ||
OIDC_USERNAME_PROPERTY = config("OIDC_USERNAME_PROPERTY") | ||
USE_OIDC_ROLES = config("USE_OIDC_ROLES") | ||
OIDC_ROLES_PROPERTY = config("OIDC_ROLES_PROPERTY") | ||
|
||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=OIDC_TOKEN_ENDPOINT) | ||
|
||
|
||
class AppUser(BaseModel): | ||
id: Optional[int] | ||
username: Optional[str] | ||
email: Optional[str] | ||
first_name: Optional[str] | ||
last_name: Optional[str] | ||
roles: Optional[List[str]] | ||
|
||
|
||
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], db: Session = Depends(get_db)): | ||
decoded = decode_token(token) | ||
user_in_db = UserService(db).get_user(decoded[OIDC_USERNAME_PROPERTY]) | ||
if not user_in_db: | ||
raise HTTPException(status_code=401, detail="You are not an authorized user.") | ||
user = AppUser( | ||
id=user_in_db.id, | ||
email=decoded["email"], | ||
first_name=decoded["given_name"], | ||
last_name=decoded["family_name"], | ||
roles=[], | ||
) | ||
if USE_OIDC_ROLES: | ||
user.roles = decoded[OIDC_ROLES_PROPERTY].copy() | ||
else: | ||
user_roles = UserService(db).get_user_roles(user.id) | ||
for user_role in user_roles: | ||
user.roles.append(user_role.role.name) | ||
return user |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from pydantic import BaseModel | ||
|
||
|
||
class User(BaseModel): | ||
id: int | ||
login: str | ||
|
||
class Config: | ||
orm_mode = True | ||
|
||
|
||
class UserGroup(BaseModel): | ||
id: int | ||
name: str | ||
|
||
class Config: | ||
orm_mode = True | ||
|
||
|
||
class UserRoles(BaseModel): | ||
role: UserGroup | ||
|
||
class Config: | ||
orm_mode = True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from typing import List | ||
|
||
from services.main import AppService | ||
from models.user import User, UserRoles | ||
|
||
|
||
class UserService(AppService): | ||
def get_users(self) -> List[User]: | ||
users = self.db.query(User).all() or [] | ||
return users | ||
|
||
def get_user(self, username: str) -> User: | ||
user = self.db.query(User).filter(User.login == username).first() or None | ||
return user | ||
|
||
def get_user_roles(self, user_id: int) -> List[UserRoles]: | ||
roles = self.db.query(UserRoles).filter(UserRoles.user_id == user_id).all() or [] | ||
return roles |