Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats #1629

Merged
merged 2 commits into from
Nov 5, 2024
Merged

Stats #1629

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 6 additions & 49 deletions IM/InfrastructureInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,51 +685,13 @@ def _is_authorized(self, self_im_auth, auth):
"""
Checks if the auth data provided is authorized to access this infrastructure
"""
res = False
for other_im_auth in auth.getAuthInfo("InfrastructureManager"):
res = True

if 'token' in self_im_auth:
if 'token' not in other_im_auth:
res = False
break
decoded_token = JWT().get_info(other_im_auth['token'])
issuer = str(decoded_token['iss'])
if not issuer.endswith('/'):
issuer += '/'
password = issuer + str(decoded_token['sub'])
# check that the token provided is associated with the current owner of the inf.
if self_im_auth['password'] != password:
res = False
break

# Username must start with the defined prefix
if not other_im_auth['username'].startswith(InfrastructureInfo.OPENID_USER_PREFIX):
res = False
break

# In case of OIDC token update it in each call to get a fresh version
self_im_auth['token'] = other_im_auth['token']
else:
for elem in ['username', 'password']:
if elem not in other_im_auth:
res = False
break
if elem not in self_im_auth:
InfrastructureInfo.logger.error("Inf ID %s has not elem %s in the auth data" % (self.id, elem))
if self_im_auth[elem] != other_im_auth[elem]:
res = False
break

if (self_im_auth['username'].startswith(InfrastructureInfo.OPENID_USER_PREFIX) and
'token' not in other_im_auth):
# This is a OpenID user do not enable to get data using user/pass creds
InfrastructureInfo.logger.warning("Inf ID %s: A non OpenID user tried to access it." % self.id)
res = False
break

if res:
return res
if other_im_auth.get("admin"):
return True
if ((self_im_auth.get("token") is None or other_im_auth.get("token") is not None) and
self_im_auth.get("username") == other_im_auth.get("username") and
self_im_auth.get("password") == other_im_auth.get("password")):
return True

def is_authorized(self, auth):
"""
Expand All @@ -739,11 +701,6 @@ def is_authorized(self, auth):
for self_im_auth in self.auth.getAuthInfo("InfrastructureManager"):
if self._is_authorized(self_im_auth, auth):
return True
if Config.ADMIN_USER:
admin_auth = dict(Config.ADMIN_USER)
admin_auth["type"] = "InfrastructureManager"
if self._is_authorized(admin_auth, auth):
return True

return False
else:
Expand Down
4 changes: 4 additions & 0 deletions IM/InfrastructureList.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ def _gen_where_from_auth(auth):
like = ""
if auth:
for elem in auth.getAuthInfo('InfrastructureManager'):
if elem.get("admin"):
return ""
if elem.get("username"):
if like:
like += " or "
Expand All @@ -266,6 +268,8 @@ def _gen_filter_from_auth(auth):
like = ""
if auth:
for elem in auth.getAuthInfo('InfrastructureManager'):
if elem.get("admin"):
return {}
if elem.get("username"):
if like:
like += "|"
Expand Down
9 changes: 9 additions & 0 deletions IM/InfrastructureManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,7 @@ def check_auth_data(auth):
raise InvaliddUserException("No credentials provided for the InfrastructureManager.")

for im_auth_item in im_auth:
im_auth_item['admin'] = False
if Config.FORCE_OIDC_AUTH and "token" not in im_auth_item:
raise InvaliddUserException("No token provided for the InfrastructureManager.")

Expand All @@ -1567,6 +1568,14 @@ def check_auth_data(auth):
else:
raise InvaliddUserException("No username nor token for the InfrastructureManager.")

if Config.ADMIN_USER:
admin_auth = dict(Config.ADMIN_USER)
admin_auth["type"] = "InfrastructureManager"
if ((im_auth_item.get("token") is None or admin_auth.get("token") is not None) and
im_auth_item.get("username") == admin_auth.get("username") and
im_auth_item.get("password") == admin_auth.get("password")):
im_auth_item['admin'] = True

if Config.SINGLE_SITE:
vmrc_auth = auth.getAuthInfo("VMRC")
single_site_auth = auth.getAuthInfo(Config.SINGLE_SITE_TYPE)
Expand Down
35 changes: 15 additions & 20 deletions test/unit/test_im_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1157,8 +1157,9 @@ def test_check_oidc_groups(self, openidclient):

Config.OIDC_GROUPS = []

def test_inf_auth_with_token(self):
im_auth = {"token": (self.gen_token())}
@patch('IM.InfrastructureManager.OpenIDClient.get_user_info_request')
def test_inf_auth_with_token(self, get_user_info_request):
im_auth = {"token": (self.gen_token(exp=120))}
im_auth['username'] = InfrastructureInfo.OPENID_USER_PREFIX + "micafer"
im_auth['password'] = "https://iam-test.indigo-datacloud.eu/user_sub"
# Check that a user/pass cred cannot access OpenID ones
Expand All @@ -1176,37 +1177,31 @@ def test_inf_auth_with_token(self):
self.assertEqual(str(ex.exception), "No token provided for the InfrastructureManager.")
Config.FORCE_OIDC_AUTH = False

Config.OIDC_ISSUERS = ["https://iam-test.indigo-datacloud.eu/"]
user_auth = Authentication([{'id': 'im', 'type': 'InfrastructureManager',
'token': im_auth['token']}])
get_user_info_request.return_value = True, {'sub': 'micafer'}
inf = InfrastructureInfo()
inf.id = "1"
inf.auth = user_auth
user_auth = IM.check_auth_data(user_auth)
res = inf.is_authorized(user_auth)
self.assertFalse(res)
self.assertTrue(res)

get_user_info_request.return_value = True, {'sub': 'user_sub'}
user_auth1 = Authentication([{'id': 'im', 'type': 'InfrastructureManager',
'username': im_auth['username'],
'password': im_auth['password'],
'token': im_auth['token']}])
'token': self.gen_token(user_sub="user_sub", exp=120)}])
user_auth1 = IM.check_auth_data(user_auth1)
res = inf.is_authorized(user_auth1)
self.assertTrue(res)

inf.auth = user_auth1
new_token = self.gen_token()
user_auth2 = Authentication([{'id': 'im', 'type': 'InfrastructureManager',
'username': im_auth['username'],
'password': im_auth['password'],
'token': new_token}])
res = inf.is_authorized(user_auth2)
self.assertTrue(res)
self.assertEqual(inf.auth.getAuthInfo("InfrastructureManager")[0]['token'], new_token)
self.assertFalse(res)

inf.auth = user_auth1
Config.ADMIN_USER = {"username": "",
"password": "https://iam-test.indigo-datacloud.eu/admin_user",
"token": ""}
admin_auth = Authentication([{'id': 'im', 'type': 'InfrastructureManager',
'username': InfrastructureInfo.OPENID_USER_PREFIX + "admin",
'password': "https://iam-test.indigo-datacloud.eu/admin_user",
'token': self.gen_token(user_sub="admin_user")}])
'token': self.gen_token(user_sub="admin_user", exp=120)}])
admin_auth = IM.check_auth_data(admin_auth)
res = inf.is_authorized(admin_auth)
self.assertTrue(res)

Expand Down