forked from jupyterhub/oauthenticator
-
Notifications
You must be signed in to change notification settings - Fork 1
/
cilogon.py
218 lines (175 loc) · 7.63 KB
/
cilogon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
"""CILogon OAuthAuthenticator for JupyterHub
Uses OAuth 2.0 with cilogon.org (override with CILOGON_HOST)
Caveats:
- For user whitelist/admin purposes, username will be the ePPN by default.
This is typically an email address and may not work as a Unix userid.
Normalization may be required to turn the JupyterHub username into a Unix username.
- Default username_claim of ePPN does not work for all providers,
e.g. generic OAuth such as Google.
Use `c.CILogonOAuthenticator.username_claim = 'email'` to use
email instead of ePPN as the JupyterHub username.
"""
import json
import os
from tornado.auth import OAuth2Mixin
from tornado import gen, web
from tornado.httputil import url_concat
from tornado.httpclient import HTTPRequest, AsyncHTTPClient
from traitlets import Unicode, List, Bool, Union, validate
from jupyterhub.traitlets import Callable
from jupyterhub.auth import LocalAuthenticator
from .oauth2 import OAuthLoginHandler, OAuthenticator
CILOGON_HOST = os.environ.get('CILOGON_HOST') or 'cilogon.org'
class CILogonMixin(OAuth2Mixin):
_OAUTH_AUTHORIZE_URL = "https://%s/authorize" % CILOGON_HOST
_OAUTH_TOKEN_URL = "https://%s/oauth2/token" % CILOGON_HOST
class CILogonLoginHandler(OAuthLoginHandler, CILogonMixin):
"""See http://www.cilogon.org/oidc for general information."""
def authorize_redirect(self, *args, **kwargs):
"""Add idp, skin to redirect params"""
extra_params = kwargs.setdefault('extra_params', {})
if self.authenticator.idp:
extra_params["selected_idp"] = self.authenticator.idp
if self.authenticator.skin:
extra_params["skin"] = self.authenticator.skin
return super().authorize_redirect(*args, **kwargs)
class CILogonOAuthenticator(OAuthenticator):
login_service = "CILogon"
client_id_env = 'CILOGON_CLIENT_ID'
client_secret_env = 'CILOGON_CLIENT_SECRET'
login_handler = CILogonLoginHandler
scope = List(Unicode(), default_value=['openid', 'email', 'org.cilogon.userinfo'],
config=True,
help="""The OAuth scopes to request.
See cilogon_scope.md for details.
At least 'openid' is required.
""",
)
@validate('scope')
def _validate_scope(self, proposal):
"""ensure openid is requested"""
if 'openid' not in proposal.value:
return ['openid'] + proposal.value
return proposal.value
user_check_cb = Callable(
None,
allow_none=True,
help="""
Callable that determines how to rewrite usernames. If this is
set, strip_idp_domain, idp_whitelist are disabled.
callable accepts one argument (username@domain) and returns the desired
username, or None if the user is disallowed
""").tag(config=True)
idp_whitelist = List(
config=True,
help="""A list of IDP which can be stripped from the username after the @ sign.""",
)
strip_idp_domain = Bool(
False,
config=True,
help="""Remove the IDP domain from the username. Note that only domains which
appear in the `idp_whitelist` will be stripped.""",
)
idp = Unicode(
config=True,
help="""The `idp` attribute is the SAML Entity ID of the user's selected
identity provider.
See https://cilogon.org/include/idplist.xml for the list of identity
providers supported by CILogon.
""",
)
skin = Unicode(
config=True,
help="""The `skin` attribute is the name of the custom CILogon interface skin
for your application.
Contact [email protected] to request a custom skin.
""",
)
username_claim = Unicode(
"eppn",
config=True,
help="""The claim in the userinfo response from which to get the JupyterHub username
Examples include: eppn, email
What keys are available will depend on the scopes requested.
See http://www.cilogon.org/oidc for details.
""",
)
@gen.coroutine
def authenticate(self, handler, data=None):
"""We set up auth_state based on additional CILogon info if we
receive it.
"""
code = handler.get_argument("code")
# TODO: Configure the curl_httpclient for tornado
http_client = AsyncHTTPClient()
# Exchange the OAuth code for a CILogon Access Token
# See: http://www.cilogon.org/oidc
headers = {
"Accept": "application/json",
"User-Agent": "JupyterHub",
}
params = dict(
client_id=self.client_id,
client_secret=self.client_secret,
redirect_uri=self.oauth_callback_url,
code=code,
grant_type='authorization_code',
)
url = url_concat("https://%s/oauth2/token" % CILOGON_HOST, params)
req = HTTPRequest(url,
headers=headers,
method="POST",
body=''
)
resp = yield http_client.fetch(req)
token_response = json.loads(resp.body.decode('utf8', 'replace'))
access_token = token_response['access_token']
self.log.info("Access token acquired.")
# Determine who the logged in user is
params = dict(access_token=access_token)
req = HTTPRequest(url_concat("https://%s/oauth2/userinfo" %
CILOGON_HOST, params),
headers=headers
)
resp = yield http_client.fetch(req)
resp_json = json.loads(resp.body.decode('utf8', 'replace'))
username = resp_json.get(self.username_claim)
if not username:
self.log.error("Username claim %s not found in the response: %s",
self.username_claim, sorted(resp_json.keys())
)
raise web.HTTPError(500, "Failed to get username from CILogon")
self.log.info("CILogon callback: %s", self.user_check_cb)
if callable(self.user_check_cb):
username_temp = self.user_check_cb(username)
self.log.info("CILogon converted %s to %s", username, username_temp)
if not username_temp:
self.log.error(
"Username failed user_check_cb", username)
raise web.HTTPError(
500, "Username/Domain not allowed to log in")
username = username_temp
elif self.idp_whitelist:
gotten_name, gotten_idp = username.split('@')
if gotten_idp not in self.idp_whitelist:
self.log.error(
"Trying to login from not whitelisted domain %s", gotten_idp)
raise web.HTTPError(
500, "Trying to login from not whitelisted domain")
if self.strip_idp_domain:
username = gotten_name
userdict = {"name": username}
# Now we set up auth_state
userdict["auth_state"] = auth_state = {}
# Save the token response and full CILogon reply in auth state
# These can be used for user provisioning
# in the Lab/Notebook environment.
auth_state['token_response'] = token_response
# store the whole user model in auth_state.cilogon_user
# keep access_token as well, in case anyone was relying on it
auth_state['access_token'] = access_token
auth_state['cilogon_user'] = resp_json
return userdict
class LocalCILogonOAuthenticator(LocalAuthenticator, CILogonOAuthenticator):
"""A version that mixes in local system user creation"""
pass