From 0a1613b197a1396c1cbfb019434e76b35c283caf Mon Sep 17 00:00:00 2001 From: Melissa Autumn Date: Wed, 8 May 2024 14:32:23 -0700 Subject: [PATCH] =?UTF-8?q?=E2=9E=95=20Add=20admin=20dependency=20for=20ad?= =?UTF-8?q?min=20invite=20routes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andreas Müller --- backend/src/appointment/dependencies/auth.py | 23 ++++++++++++++++++- .../src/appointment/exceptions/validation.py | 9 ++++++++ backend/src/appointment/routes/invite.py | 14 ++++++----- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/backend/src/appointment/dependencies/auth.py b/backend/src/appointment/dependencies/auth.py index ccece129f..322c68471 100644 --- a/backend/src/appointment/dependencies/auth.py +++ b/backend/src/appointment/dependencies/auth.py @@ -10,7 +10,7 @@ from ..database import repo, schemas from ..dependencies.database import get_db from ..exceptions import validation -from ..exceptions.validation import InvalidTokenException +from ..exceptions.validation import InvalidTokenException, InvalidPermissionLevelException oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False) @@ -55,6 +55,27 @@ def get_subscriber( return user +def get_admin_subscriber( + token: Annotated[str, Depends(oauth2_scheme)], + db: Session = Depends(get_db), +): + if token is None: + raise InvalidTokenException() + + """Automatically retrieve and return the subscriber""" + user = get_user_from_token(db, token) + + if user is None: + raise InvalidTokenException() + + # check admin allow list + ALLOW_LIST = os.getenv("APP_ADMIN_ALLOW_LIST", '').split(',') + if not user.email in ALLOW_LIST: + raise InvalidPermissionLevelException() + + return user + + def get_subscriber_from_signed_url( url: str = Body(..., embed=True), db: Session = Depends(get_db), diff --git a/backend/src/appointment/exceptions/validation.py b/backend/src/appointment/exceptions/validation.py index e939b2977..91bfe60d8 100644 --- a/backend/src/appointment/exceptions/validation.py +++ b/backend/src/appointment/exceptions/validation.py @@ -20,6 +20,15 @@ def get_msg(self): return l10n('unknown-error') +class InvalidPermissionLevelException(APIException): + """Raise when the subscribers permission level is too low for the action""" + id_code = 'INVALID_PERMISSION_LEVEL' + status_code = 401 + + def get_msg(self): + return l10n('protected-route-fail') + + class InvalidTokenException(APIException): """Raise when the subscriber could not be parsed from the auth token""" id_code = 'INVALID_TOKEN' diff --git a/backend/src/appointment/routes/invite.py b/backend/src/appointment/routes/invite.py index f783a132b..01eb7e402 100644 --- a/backend/src/appointment/routes/invite.py +++ b/backend/src/appointment/routes/invite.py @@ -4,7 +4,8 @@ from sqlalchemy.orm import Session from ..database import repo, schemas, models -from ..dependencies.auth import get_subscriber, get_subscriber_from_signed_url +from ..database.models import Subscriber +from ..dependencies.auth import get_admin_subscriber from ..dependencies.database import get_db from ..exceptions import validation @@ -13,13 +14,14 @@ @router.get('/', response_model=list[schemas.Invite]) -def get_all_invites(db: Session = Depends(get_db)): +def get_all_invites(db: Session = Depends(get_db), admin: Subscriber = Depends(get_admin_subscriber)): + """List all existing invites, needs admin permissions""" return db.query(models.Invite).all() @router.post("/generate/{n}", response_model=list[schemas.Invite]) -def generate_invite_codes(n: int, db: Session = Depends(get_db)): - """endpoint to generate n invite codes""" +def generate_invite_codes(n: int, db: Session = Depends(get_db), admin: Subscriber = Depends(get_admin_subscriber)): + """endpoint to generate n invite codes, needs admin permissions""" return repo.invite.generate_codes(db, n) @@ -39,8 +41,8 @@ def use_invite_code(code: str, db: Session = Depends(get_db)): @router.put("/revoke/{code}") -def revoke_invite_code(code: str, db: Session = Depends(get_db)): - """endpoint to revoke a given invite code and mark in unavailable""" +def revoke_invite_code(code: str, db: Session = Depends(get_db), admin: Subscriber = Depends(get_admin_subscriber)): + """endpoint to revoke a given invite code and mark in unavailable, needs admin permissions""" if not repo.invite.code_exists(db, code): raise validation.InviteCodeNotFoundException() if not repo.invite.code_is_available(db, code):