Skip to content

Commit

Permalink
chore: Add type annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
g3n35i5 committed Jun 15, 2024
1 parent 5da3ac4 commit 69733c0
Show file tree
Hide file tree
Showing 94 changed files with 628 additions and 562 deletions.
8 changes: 5 additions & 3 deletions src/shop_db2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
# -*- coding: utf-8 -*-
__author__ = "g3n35i5"

from typing import Optional

from sqlalchemy.exc import DontWrapMixin

# Base exception for all exceptions (needed for instance check)


class ShopdbException(Exception, DontWrapMixin):
type: str = None
message: str = None
code: int = None
type: Optional[str] = None
message: Optional[str] = None
code: Optional[int] = None


# App related exceptions.
Expand Down
71 changes: 36 additions & 35 deletions src/shop_db2/helpers/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
__author__ = "g3n35i5"

from functools import wraps
from typing import Any, Callable

import jwt
from flask import Response, request
Expand All @@ -12,11 +13,11 @@
from shop_db2.models import User


def checkIfUserIsValid(f):
def checkIfUserIsValid(func: Callable) -> Callable:
"""This function checks whether the requested user exists, has been verified and is active.
If this is not the case the request will be blocked.
:param f: Is the wrapped function.
:param func: Is the wrapped function.
:return: The wrapped function f with the additional parameter user.
Expand All @@ -25,8 +26,8 @@ def checkIfUserIsValid(f):
:raises UserIsInactive: If the user with this ID is inactive.
"""

@wraps(f)
def decorator(*args, **kwargs):
@wraps(func)
def decorator(*args: Any, **kwargs: Any) -> Any:
user = User.query.filter_by(id=kwargs["user_id"]).first()
if not user:
raise exc.EntryNotFound()
Expand All @@ -40,17 +41,17 @@ def decorator(*args, **kwargs):
raise exc.UserIsInactive()

# If all criteria are met, the requested function can be executed.
return f(user, *args, **kwargs)
return func(user, *args, **kwargs)

return decorator


def adminRequired(f):
def adminRequired(func: Callable) -> Callable:
"""This function checks whether a valid token is contained in the request.
If this is not the case, or the user has no admin rights, the request
will be blocked.
:param f: Is the wrapped function.
:param func: Is the wrapped function.
:return: The wrapped function f with the additional
parameter admin.
Expand All @@ -64,48 +65,48 @@ def adminRequired(f):
:raises UnauthorizedAccess: The user has no administrator privileges.
"""

@wraps(f)
def decorated(*args, **kwargs):
@wraps(func)
def decorated(*args: Any, **kwargs: Any) -> Any:
# Does the request header contain a token?
try:
token = request.headers["token"]
except KeyError:
raise exc.UnauthorizedAccess()
except KeyError as error:
raise exc.UnauthorizedAccess() from error

# Is the token valid?
try:
data = jwt.decode(token, app.config["SECRET_KEY"])
except jwt.exceptions.DecodeError:
raise exc.TokenIsInvalid()
except jwt.ExpiredSignatureError:
raise exc.TokenHasExpired()
except jwt.exceptions.DecodeError as error:
raise exc.TokenIsInvalid() from error
except jwt.ExpiredSignatureError as error:
raise exc.TokenHasExpired() from error

# If there is no admin object in the token and does the user does have
# admin rights?
try:
admin_id = data["user"]["id"]
admin = User.query.filter(User.id == admin_id).first()
assert admin.is_admin is True
except KeyError:
raise exc.TokenIsInvalid()
except AssertionError:
raise exc.UnauthorizedAccess()
except KeyError as error:
raise exc.TokenIsInvalid() from error
except AssertionError as error:
raise exc.UnauthorizedAccess() from error

# At this point it was verified that the request comes from an
# admin and the request is executed. In addition, the user is
# forwarded to the following function so that the administrator
# responsible for any changes in the database can be traced.
return f(admin=admin, *args, **kwargs)
return func(admin=admin, *args, **kwargs)

return decorated


def adminOptional(f):
def adminOptional(func: Callable) -> Callable:
"""This function checks whether a valid token is contained in the request.
If this is not the case, or the user has no admin rights, the following
function returns only a part of the available data.
:param f: Is the wrapped function.
:param func: Is the wrapped function.
:return: Returns the wrapped function f with the
additional parameter admin, if present.
Expand All @@ -117,56 +118,56 @@ def adminOptional(f):
token.
"""

@wraps(f)
def decorated(*args, **kwargs):
@wraps(func)
def decorated(*args: Any, **kwargs: Any) -> Any:
# Does the request header contain a token?
try:
token = request.headers["token"]
except KeyError:
return f(admin=None, *args, **kwargs)
return func(admin=None, *args, **kwargs)

# Is the token valid?
try:
data = jwt.decode(token, app.config["SECRET_KEY"])
except (jwt.exceptions.DecodeError, jwt.ExpiredSignatureError):
return f(admin=None, *args, **kwargs)
return func(admin=None, *args, **kwargs)

# If there is no admin object in the token and does the user does have
# admin rights?
try:
admin_id = data["user"]["id"]
admin = User.query.filter(User.id == admin_id).first()
assert admin.is_admin is True
except KeyError:
raise exc.TokenIsInvalid()
except KeyError as error:
raise exc.TokenIsInvalid() from error
except AssertionError:
return f(admin=None, *args, **kwargs)
return func(admin=None, *args, **kwargs)

# At this point it was verified that the request comes from an
# admin and the request is executed. In addition, the user is
# forwarded to the following function so that the administrator
# responsible for any changes in the database can be traced.
return f(admin=admin, *args, **kwargs)
return func(admin=admin, *args, **kwargs)

return decorated


def deprecate_route(message=""):
def deprecate_route(message: str = "") -> Callable:
"""This decorator adds a warning message to the response header when the route is marked as deprecated.
:param message: The message to be added to the response header.
"""

def _decorator(func):
def _decorator(func: Callable) -> Callable:
@wraps(func)
def _wrapper(*args, **kwargs):
def _wrapper(*args: Any, **kwargs: Any) -> Any:
data = func(*args, **kwargs)
# Case 1: Tuple with (Response object, Status code)
if isinstance(data, tuple):
response: Response = data[0]
response = data[0]
# Case 2: Plain response object
elif isinstance(data, Response):
response: Response = data
response = data
else:
return data
response.headers["Warning"] = message
Expand Down
8 changes: 5 additions & 3 deletions src/shop_db2/helpers/deposits.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# -*- coding: utf-8 -*-
__author__ = "g3n35i5"

from typing import Any, Dict

from sqlalchemy.exc import IntegrityError

import shop_db2.exceptions as exc
Expand All @@ -10,7 +12,7 @@
from shop_db2.models import Deposit, User


def insert_deposit(data, admin):
def insert_deposit(data: Dict[str, Any], admin: User) -> None:
"""This help function creates a new deposit with the given data.
:raises DataIsMissing: If not all required data is available.
Expand Down Expand Up @@ -45,5 +47,5 @@ def insert_deposit(data, admin):
deposit = Deposit(**data)
deposit.admin_id = admin.id
db.session.add(deposit)
except IntegrityError:
raise exc.CouldNotCreateEntry()
except IntegrityError as error:
raise exc.CouldNotCreateEntry() from error
7 changes: 4 additions & 3 deletions src/shop_db2/helpers/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# -*- coding: utf-8 -*-
__author__ = "g3n35i5"

from typing import Any

import werkzeug.exceptions as werkzeug_exceptions
from flask import jsonify

Expand All @@ -10,7 +12,7 @@


@app.errorhandler(Exception)
def handle_error(error):
def handle_error(error: Exception) -> Any:
"""This wrapper catches all exceptions and, if possible, returns a user
friendly response. Otherwise, it will raise the error
Expand Down Expand Up @@ -44,5 +46,4 @@ def handle_error(error):
# Create, if possible, a user friendly response.
if isinstance(error, exc.ShopdbException):
return jsonify(result=error.type, message=error.message), error.code
else: # pragma: no cover
raise error
raise error
12 changes: 8 additions & 4 deletions src/shop_db2/helpers/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
import time

from flask import g, request
from flask.wrappers import Response

import shop_db2.exceptions as exc
from shop_db2.api import app
from shop_db2.helpers.decorators import adminOptional
from shop_db2.models.user import User


@app.before_request
@adminOptional
def before_request_hook(admin):
def before_request_hook(admin: User) -> None:
"""This function is executed before each request is processed. Its purpose is
to check whether the application is currently in maintenance mode. If this
is the case, the current request is aborted and a corresponding exception
Expand Down Expand Up @@ -51,15 +53,17 @@ def before_request_hook(admin):


@app.after_request
def after_request_hook(response):
def after_request_hook(response: Response) -> Response:
"""This functions gets executed each time a request is finished.
:param response: is the response to be returned.
:return: The request response.
"""
# If the app is in DEBUG mode, log the request execution time
if app.logger.level == logging.DEBUG:
if app.logger.level == logging.DEBUG: # pylint: disable
execution_time = datetime.timedelta(seconds=(time.time() - g.start))
app.logger.debug("Request execution time for '{}': {}".format(request.endpoint, execution_time))
app.logger.debug( # pylint: disable
"Request execution time for '{}': {}".format(request.endpoint, execution_time)
)

return response
6 changes: 3 additions & 3 deletions src/shop_db2/helpers/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from shop_db2.models import Product, ProductPrice


def _shift_date_to_begin_of_day(date):
def _shift_date_to_begin_of_day(date: datetime.datetime) -> datetime.datetime:
"""This function moves a timestamp to the beginning of the day.
:param date: Is the date to be moved.
Expand All @@ -21,7 +21,7 @@ def _shift_date_to_begin_of_day(date):
return date.replace(hour=0, minute=0, second=0, microsecond=0)


def _shift_date_to_end_of_day(date):
def _shift_date_to_end_of_day(date: datetime.datetime) -> datetime.datetime:
"""This function moves a timestamp to the end of the day.
:param date: Is the date to be moved.
Expand All @@ -30,7 +30,7 @@ def _shift_date_to_end_of_day(date):
return date.replace(hour=23, minute=59, second=59, microsecond=999999)


def _get_product_mean_price_in_time_range(product_id, start, end):
def _get_product_mean_price_in_time_range(product_id: int, start: datetime.datetime, end: datetime.datetime) -> int:
"""This function calculates the mean price in a given range of time.
:param product_id: Is the product id.
Expand Down
Loading

0 comments on commit 69733c0

Please sign in to comment.