Skip to content

Commit

Permalink
Improve auth validation code
Browse files Browse the repository at this point in the history
  • Loading branch information
micafer committed Nov 5, 2024
1 parent b6b9f88 commit 1e78e96
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 69 deletions.
55 changes: 6 additions & 49 deletions IM/InfrastructureInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,51 +685,13 @@ def _is_authorized(self, self_im_auth, auth):
"""
Checks if the auth data provided is authorized to access this infrastructure
"""
res = False
for other_im_auth in auth.getAuthInfo("InfrastructureManager"):
res = True

if 'token' in self_im_auth:
if 'token' not in other_im_auth:
res = False
break
decoded_token = JWT().get_info(other_im_auth['token'])
issuer = str(decoded_token['iss'])
if not issuer.endswith('/'):
issuer += '/'
password = issuer + str(decoded_token['sub'])
# check that the token provided is associated with the current owner of the inf.
if self_im_auth['password'] != password:
res = False
break

# Username must start with the defined prefix
if not other_im_auth['username'].startswith(InfrastructureInfo.OPENID_USER_PREFIX):
res = False
break

# In case of OIDC token update it in each call to get a fresh version
self_im_auth['token'] = other_im_auth['token']
else:
for elem in ['username', 'password']:
if elem not in other_im_auth:
res = False
break
if elem not in self_im_auth:
InfrastructureInfo.logger.error("Inf ID %s has not elem %s in the auth data" % (self.id, elem))
if self_im_auth[elem] != other_im_auth[elem]:
res = False
break

if (self_im_auth['username'].startswith(InfrastructureInfo.OPENID_USER_PREFIX) and
'token' not in other_im_auth):
# This is a OpenID user do not enable to get data using user/pass creds
InfrastructureInfo.logger.warning("Inf ID %s: A non OpenID user tried to access it." % self.id)
res = False
break

if res:
return res
if self_im_auth.get("admin"):
return True
if ((self_im_auth.get("token") is None or other_im_auth.get("token") is not None) and
self_im_auth.get("username") == other_im_auth.get("username") and
self_im_auth.get("password") == other_im_auth.get("password")):
return True

def is_authorized(self, auth):
"""
Expand All @@ -739,11 +701,6 @@ def is_authorized(self, auth):
for self_im_auth in self.auth.getAuthInfo("InfrastructureManager"):
if self._is_authorized(self_im_auth, auth):
return True
if Config.ADMIN_USER:
admin_auth = dict(Config.ADMIN_USER)
admin_auth["type"] = "InfrastructureManager"
if self._is_authorized(admin_auth, auth):
return True

return False
else:
Expand Down
9 changes: 9 additions & 0 deletions IM/InfrastructureManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,7 @@ def check_auth_data(auth):
raise InvaliddUserException("No credentials provided for the InfrastructureManager.")

for im_auth_item in im_auth:
im_auth_item['admin'] = False
if Config.FORCE_OIDC_AUTH and "token" not in im_auth_item:
raise InvaliddUserException("No token provided for the InfrastructureManager.")

Expand All @@ -1567,6 +1568,14 @@ def check_auth_data(auth):
else:
raise InvaliddUserException("No username nor token for the InfrastructureManager.")

if Config.ADMIN_USER:
admin_auth = dict(Config.ADMIN_USER)
admin_auth["type"] = "InfrastructureManager"
if ((im_auth_item.get("token") is None or admin_auth.get("token") is not None) and
im_auth_item.get("username") == admin_auth.get("username") and
im_auth_item.get("password") == admin_auth.get("password")):
im_auth['admin'] = True

if Config.SINGLE_SITE:
vmrc_auth = auth.getAuthInfo("VMRC")
single_site_auth = auth.getAuthInfo(Config.SINGLE_SITE_TYPE)
Expand Down
35 changes: 15 additions & 20 deletions test/unit/test_im_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,8 +1157,9 @@ def test_check_oidc_groups(self, openidclient):

Config.OIDC_GROUPS = []

def test_inf_auth_with_token(self):
im_auth = {"token": (self.gen_token())}
@patch('IM.InfrastructureManager.OpenIDClient.get_user_info_request')
def test_inf_auth_with_token(self, get_user_info_request):
im_auth = {"token": (self.gen_token(exp=120))}
im_auth['username'] = InfrastructureInfo.OPENID_USER_PREFIX + "micafer"
im_auth['password'] = "https://iam-test.indigo-datacloud.eu/user_sub"
# Check that a user/pass cred cannot access OpenID ones
Expand All @@ -1176,37 +1177,31 @@ def test_inf_auth_with_token(self):
self.assertEqual(str(ex.exception), "No token provided for the InfrastructureManager.")
Config.FORCE_OIDC_AUTH = False

Config.OIDC_ISSUERS = ["https://iam-test.indigo-datacloud.eu/"]
user_auth = Authentication([{'id': 'im', 'type': 'InfrastructureManager',
'token': im_auth['token']}])
get_user_info_request.return_value = True, {'sub': 'micafer'}
inf = InfrastructureInfo()
inf.id = "1"
inf.auth = user_auth
user_auth = IM.check_auth_data(user_auth)
res = inf.is_authorized(user_auth)
self.assertFalse(res)
self.assertTrue(res)

get_user_info_request.return_value = True, {'sub': 'user_sub'}
user_auth1 = Authentication([{'id': 'im', 'type': 'InfrastructureManager',
'username': im_auth['username'],
'password': im_auth['password'],
'token': im_auth['token']}])
'token': self.gen_token(user_sub="user_sub", exp=120)}])
user_auth1 = IM.check_auth_data(user_auth1)
res = inf.is_authorized(user_auth1)
self.assertTrue(res)

inf.auth = user_auth1
new_token = self.gen_token()
user_auth2 = Authentication([{'id': 'im', 'type': 'InfrastructureManager',
'username': im_auth['username'],
'password': im_auth['password'],
'token': new_token}])
res = inf.is_authorized(user_auth2)
self.assertTrue(res)
self.assertEqual(inf.auth.getAuthInfo("InfrastructureManager")[0]['token'], new_token)
self.assertFalse(res)

inf.auth = user_auth1
Config.ADMIN_USER = {"username": "",
"password": "https://iam-test.indigo-datacloud.eu/admin_user",
"token": ""}
admin_auth = Authentication([{'id': 'im', 'type': 'InfrastructureManager',
'username': InfrastructureInfo.OPENID_USER_PREFIX + "admin",
'password': "https://iam-test.indigo-datacloud.eu/admin_user",
'token': self.gen_token(user_sub="admin_user")}])
'token': self.gen_token(user_sub="admin_user", exp=120)}])
admin_auth = IM.check_auth_data(admin_auth)
res = inf.is_authorized(admin_auth)
self.assertTrue(res)

Expand Down

0 comments on commit 1e78e96

Please sign in to comment.