-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
17 changed files
with
214 additions
and
241 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Callable, TypeVar | ||
|
||
RT = TypeVar("RT") | ||
|
||
|
||
class UnauthorizedError(RuntimeError): | ||
"""We raise this exception when a user tries to access a resource without the proper permissions.""" | ||
|
||
def __init__(self, function: Callable[..., RT], *args: str, **kwargs: int): | ||
super().__init__("unauthorized access to resource") | ||
self.function = function | ||
self.args = args | ||
self.kwargs = kwargs | ||
|
||
def __str__(self): | ||
return ( | ||
f"UnauthorizedError from [{self.function}] - ({self.args} ; {self.kwargs})" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
# -*- coding: utf-8 -*-# | ||
"""Permission classes and functions for the role-based access control module.""" | ||
from __future__ import annotations | ||
|
||
from abc import ABC | ||
from enum import Enum, auto | ||
from typing import Dict, TypeVar | ||
|
||
RT = TypeVar("RT") | ||
|
||
|
||
class Namespace(Enum): | ||
"""Namespaces define where a permission applies""" | ||
|
||
System = auto() | ||
User = auto() | ||
Peer = auto() | ||
|
||
|
||
class Permission(Enum): | ||
"""We have two permissions: Read and Write, Write implies read.""" | ||
|
||
Read = auto() | ||
Write = auto() | ||
|
||
|
||
class Permissions(ABC): | ||
def __init__(self, name: str, permissions: Dict[Namespace, Permission]): | ||
self.name = name | ||
self.permissions: Dict[Namespace, Permission] = permissions | ||
|
||
def __len__(self): | ||
return len(self.permissions.keys()) | ||
|
||
def __repr__(self): | ||
return f"<Permissions {self.name}>" | ||
|
||
def __iter__(self): | ||
for permission in self.permissions: | ||
yield permission | ||
|
||
def can_read(self, namespace: Namespace): | ||
if namespace not in self.permissions.keys(): | ||
return False | ||
return ( | ||
self.permissions[namespace] == Permission.Read | ||
or self.permissions[namespace] == Permission.Write | ||
) | ||
|
||
def can_write(self, namespace: Namespace): | ||
if namespace not in self.permissions.keys(): | ||
return False | ||
return self.permissions[namespace] == Permission.Write | ||
|
||
|
||
class AdministratorPermissions(Permissions): | ||
def __init__(self): | ||
super().__init__( | ||
"System Administrator", | ||
{Namespace.System: Permission.Write, Namespace.User: Permission.Read}, | ||
) | ||
|
||
def __str__(self): | ||
return self.name | ||
|
||
|
||
class PeerPermissions(Permissions): | ||
def __init__(self): | ||
super().__init__( | ||
"Peer Node", | ||
{Namespace.Peer: Permission.Write}, | ||
) | ||
|
||
def __str__(self): | ||
return self.name | ||
|
||
|
||
class TenantPermissions(Permissions): | ||
def __init__(self): | ||
super().__init__( | ||
"TenantOwner", | ||
{Namespace.System: Permission.Read, Namespace.User: Permission.Write}, | ||
) | ||
|
||
def __str__(self): | ||
return self.name |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from __future__ import annotations | ||
|
||
from functools import wraps | ||
from typing import Callable | ||
|
||
from starlette.requests import Request | ||
|
||
from horao.auth.error import UnauthorizedError | ||
from horao.auth.permissions import RT, Namespace, Permission | ||
from horao.auth.roles import User | ||
|
||
|
||
def permission_required( | ||
namespace: Namespace, permission: Permission | ||
) -> Callable[[Callable[..., RT]], Callable[..., RT]]: | ||
""" | ||
Decorator to check if a user has the permission to access a resource. | ||
:param namespace: namespace to check | ||
:param permission: permission to check | ||
:return: function call if the user has the permission | ||
:raises: UnauthorizedError if the user does not have the permission | ||
""" | ||
|
||
def decorator(func: Callable[..., RT]) -> Callable[..., RT]: | ||
@wraps(func) | ||
async def wrapper(*args: str, **kwargs: int) -> RT: | ||
for arg in args: | ||
if isinstance(arg, Request): | ||
if not arg.user: | ||
raise UnauthorizedError(func, *args, **kwargs) | ||
if isinstance(arg.user, User): | ||
if permission.Write and any( | ||
[p for p in arg.user.permissions if p.can_write(namespace)] | ||
): | ||
return await func(*args, **kwargs) | ||
if permission.Read and any( | ||
[p for p in arg.user.permissions if p.can_read(namespace)] | ||
): | ||
return await func(*args, **kwargs) | ||
raise UnauthorizedError(func, *args, **kwargs) | ||
|
||
return wrapper # type: ignore | ||
|
||
return decorator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.