-
Notifications
You must be signed in to change notification settings - Fork 0
/
auth.py
134 lines (106 loc) · 3.91 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# -*- coding: utf-8 -*-
import functools
import flask
import core
import jwt
import base64
from datetime import datetime
from werkzeug.exceptions import Unauthorized, Forbidden
def requires_auth(f):
"""
Requires user authentication information to be supplied as a JWT.
Decorator function for Flask requests. If the JWT can be decoded successfully, ``flask.g.user`` will be set with the
JWT payload.
:param f: function or method to be decorated
:return: decorated function
"""
@functools.wraps(f)
def decorated_f(*args, **kwargs):
token = auth_token(flask.request.headers)
flask.g.user = authenticate_user(token)
return f(*args, **kwargs)
return decorated_f
def requires_role(role):
"""
Requires user authentication information to be supplied as a JWT with ``role`` in ``payload['roles']``.
Decorator function for Flask requests. If the JWT can be decoded successfully, ``flask.g.user`` will be set with the
JWT payload.
:param role: required role
:type role: str
:return: decorated function
"""
def decorate(f):
def decorated_f(*args, **kwargs):
token = auth_token(flask.request.headers)
user = authenticate_user(token)
try:
if role not in user["roles"]:
raise Forbidden('Requires role `{}`'.format(role))
except KeyError:
raise Forbidden('No roles provided in token')
flask.g.user = user
return f(*args, **kwargs)
return decorated_f
return decorate
def auth_token(headers):
"""
Return Bearer token from Authorization header.
Raises :class:`werkzeug.exceptions.Unauthorized` if no or invalid Authorization header provided.
:param headers: request headers
:type headers: dict
:return: Bearer token
:rtype: str
"""
auth = headers.get('Authorization', None)
if not auth:
raise Unauthorized('Authorization header is expected')
parts = auth.split()
if parts[0].lower() != 'bearer':
raise Unauthorized('Authorization header must start with Bearer')
elif len(parts) == 1:
raise Unauthorized('Token not found')
elif len(parts) > 2:
raise Unauthorized('Authorization header must be Bearer + \s + token')
return parts[1]
def authenticate_user(token):
"""
Authenticate and return user from JSON Web Token.
Raises :class:`werkzeug.exceptions.Unauthorized` if token is invalid.
:param token: JWT
:type token: str
:return: user
:rtype: dict
"""
client_id = core.app.flask_app.config['AUTH_CLIENT_ID']
client_secret = core.app.flask_app.config['AUTH_CLIENT_SECRET']
try:
return jwt.decode(
token,
base64.b64decode(client_secret.replace("_", "/").replace("-", "+")),
audience=client_id
)
except jwt.ExpiredSignature:
raise Unauthorized('Token is expired')
except jwt.InvalidAudienceError:
raise Unauthorized('Incorrect audience')
except jwt.DecodeError:
raise Unauthorized('Token signature is invalid')
except jwt.InvalidTokenError: # jwt base exception
raise Unauthorized('Token invalid')
def create_jwt(payload, set_audience=True):
"""
Return a JSON Web Token with data ``payload``.
Automatically adds ``aud`` to payload if ``set_audience``.
:param payload: data to encode in jwt
:type payload: dict
:param set_audience: whether to set the audience info
:type set_audience: bool
:return: JWT
:rtype: str
"""
secret = core.app.flask_app.config['AUTH_CLIENT_SECRET']
signature = base64.b64decode(secret.replace("_", "/").replace("-", "+"))
if set_audience:
payload['aud'] = core.app.flask_app.config['AUTH_CLIENT_ID']
payload['iat'] = datetime.utcnow()
return jwt.encode(payload, signature).decode('utf-8') # return as string instead of bytes