Skip to content

Commit

Permalink
Add test case for DAB RBAC (#2239)
Browse files Browse the repository at this point in the history
Add a new test case that checks custom object role assignment to a team.

No-Issue
  • Loading branch information
cutwater authored Aug 21, 2024
1 parent 7635f30 commit cf98a67
Showing 1 changed file with 117 additions and 24 deletions.
141 changes: 117 additions & 24 deletions galaxy_ng/tests/integration/dab/test_dab_rbac_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from galaxykit import GalaxyClient
from galaxykit.utils import GalaxyClientError


GALAXY_API_PATH_PREFIX = "/api/galaxy" # cant import from settings on integration tests


Expand Down Expand Up @@ -108,7 +109,7 @@ def delete_role():
raise RuntimeError(f"Could not figure out how to delete {role}")
assert response.status_code == HTTPStatus.NO_CONTENT

roles_list = gc.get(f"{url_base}?name={data['name']}")
roles_list = gc.get(url_base, params={"name": data['name']})
if roles_list["count"] > 1:
raise RuntimeError(f"Found too many {url_base} with expected name {data['name']}")
if roles_list["count"] == 1:
Expand Down Expand Up @@ -184,7 +185,10 @@ def assert_object_role_assignments(gc, user, namespace, expected=0):
assert len(data["users"]) == expected

# Assure the assignment shows up in the DAB RBAC API
data = gc.get(f"_ui/v2/role_user_assignments/?user={user['id']}&object_id={namespace['id']}")
data = gc.get(
"_ui/v2/role_user_assignments/",
params={"user": user['id'], "object_id": namespace['id']}
)
assert data["count"] == expected

if not expected:
Expand Down Expand Up @@ -226,12 +230,12 @@ def test_create_custom_namespace_system_admin_role(custom_role_factory, galaxy_c
assert system_ns_role["name"] == NS_FIXTURE_DATA["name"]

gc = galaxy_client("admin")
roles_list = gc.get(f"{DAB_ROLE_URL}?name={NS_FIXTURE_DATA['name']}")
roles_list = gc.get(DAB_ROLE_URL, params={"name": NS_FIXTURE_DATA['name']})
assert roles_list["count"] == 1
dab_role = roles_list["results"][0]
assert set(dab_role["permissions"]) == set(NS_FIXTURE_DATA["permissions"])

roles_list = gc.get(f"{PULP_ROLE_URL}?name={NS_FIXTURE_DATA['name']}")
roles_list = gc.get(PULP_ROLE_URL, params={"name": NS_FIXTURE_DATA['name']})
assert roles_list["count"] == 1
pulp_role = roles_list["results"][0]
assert set(pulp_role["permissions"]) == set(NS_FIXTURE_DATA["permissions"])
Expand Down Expand Up @@ -342,8 +346,6 @@ def test_give_team_custom_role_system(
assert role_definition_resp["name"] == NS_FIXTURE_DATA["name"]
assert role_definition_resp["description"] == NS_FIXTURE_DATA["description"]

# assert_system_role_team_assignments(admin_client, user, expected=1)

# Step 3: Check that user with assigned system role has write access to a namespace.

response = user_client.put(
Expand Down Expand Up @@ -385,7 +387,7 @@ def test_give_user_custom_role_object(
by_role_api,
by_assignment_api
):
gc = galaxy_client("admin")
admin_client = galaxy_client("admin")

if by_role_api == 'pulp' and by_assignment_api == 'dab':
pytest.skip(
Expand All @@ -398,27 +400,30 @@ def test_give_user_custom_role_object(
data["name"] = "galaxy.namespace_custom_dab_object_role"
data["content_type"] = "galaxy.namespace"
custom_obj_role_dab = custom_role_factory(data)
custom_obj_role_pulp = gc.get(f"{PULP_ROLE_URL}?name={data['name']}")['results'][0]
custom_obj_role_pulp = admin_client.get(
PULP_ROLE_URL, params={"name": data['name']}
)['results'][0]
else:
data = NS_FIXTURE_DATA.copy()
data["name"] = "galaxy.namespace_custom_pulp_object_role"
custom_obj_role_pulp = custom_role_factory(data, url_base=PULP_ROLE_URL)
custom_obj_role_dab = gc.get(f"{DAB_ROLE_URL}?name={data['name']}")['results'][0]
custom_obj_role_dab = admin_client.get(
DAB_ROLE_URL, params={"name": data['name']}
)['results'][0]

# NOTE: If basic_user doesn't suffice, the dab proxy supports creating users
# and syncing them down to galaxy

user_r = gc.get("_ui/v2/users/")
assert user_r["count"] > 0
user = user_r["results"][0]
user_client = galaxy_client("basic_user")
user = user_client.get("_ui/v1/me/")

# sanity - assignments should not exist at start of this test
assert_object_role_assignments(gc, user, namespace, 0)
assert_object_role_assignments(admin_client, user, namespace, 0)

# Give the user permission to the namespace object
dab_assignment = None
if by_assignment_api == "dab":
dab_assignment = gc.post(
dab_assignment = admin_client.post(
"_ui/v2/role_user_assignments/",
body={
"role_definition": custom_obj_role_dab["id"],
Expand All @@ -436,18 +441,18 @@ def test_give_user_custom_role_object(
}
],
}
gc.put(f"_ui/v1/my-namespaces/{namespace['name']}/", body=payload)
admin_client.put(f"_ui/v1/my-namespaces/{namespace['name']}/", body=payload)

# TODO: make a request as the user and see that it works
# NOTE: Check if user can have a minimal permission to modify namespace attributes
# (e.g. description, company, etc.)
# NOTE: Check after permission is revoked, user cannot perform same operations

assert_object_role_assignments(gc, user, namespace, 1)
assert_object_role_assignments(admin_client, user, namespace, 1)

# Remove the permission from before
if by_assignment_api == "dab":
response = gc.delete(
response = admin_client.delete(
f"_ui/v2/role_user_assignments/{dab_assignment['id']}/",
parse_json=False
)
Expand All @@ -457,9 +462,96 @@ def test_give_user_custom_role_object(
"name": namespace["name"],
"users": [],
}
gc.put(f"_ui/v1/my-namespaces/{namespace['name']}/", body=payload)
admin_client.put(f"_ui/v1/my-namespaces/{namespace['name']}/", body=payload)

assert_object_role_assignments(admin_client, user, namespace, 0)


@pytest.mark.deployment_standalone
def test_give_team_custom_role_object(
settings,
galaxy_client,
custom_role_factory,
namespace,
team,
):
if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False:
pytest.skip("galaxykit uses drf tokens, which bypass JWT auth and claims processing")

# Step 0: Setup test.

admin_client = galaxy_client("admin")

user_client = galaxy_client("basic_user")
user = user_client.get("_ui/v1/me/")
add_user_to_team(admin_client, user["id"], team["id"])

data = {
"name": "galaxy.namespace_custom_object_role",
"content_type": "galaxy.namespace",
"description": "Galaxy namespace custom object role",
"permissions": [
"galaxy.change_namespace",
"galaxy.delete_namespace",
"galaxy.view_namespace",
],
}
custom_obj_role_dab = custom_role_factory(data)

# Step 1: Check that regular user doesn't have write access to a namespace.
with pytest.raises(GalaxyClientError) as ctx:
user_client.put(
f"_ui/v1/namespaces/{namespace['name']}/", body={
**namespace,
"company": "Test RBAC Company 1",
}
)
assert ctx.value.response.status_code == HTTPStatus.FORBIDDEN

# Step 2: Assign object role to a team

admin_client.post(
"_ui/v2/role_team_assignments/",
body={
"team": team["id"],
"role_definition": custom_obj_role_dab["id"],
"content_type": "galaxy.namespace",
"object_id": str(namespace["id"]),
},
)

assert_object_role_assignments(gc, user, namespace, 0)
# Step 3: Check that user with assigned system role has write access to a namespace.

namespace = user_client.get(f"_ui/v1/namespaces/{namespace['name']}/")
namespace = user_client.put(
f"_ui/v1/namespaces/{namespace['name']}/", body={
**namespace,
"company": "Test RBAC Company 2",
}
)
assert namespace["company"] == "Test RBAC Company 2"

role_assignment = admin_client.get(
"_ui/v2/role_team_assignments/",
params={"object_id": namespace["id"]}
)["results"][0]

response = admin_client.delete(
f"_ui/v2/role_team_assignments/{role_assignment['id']}/",
parse_json=False
)
assert response.status_code == HTTPStatus.NO_CONTENT

# Step 5: Check that user with revoked object role doesn't have write access to a namespace.

with pytest.raises(GalaxyClientError) as ctx:
user_client.put(
f"_ui/v1/namespaces/{namespace['name']}/", body={
**namespace,
"company": "Test RBAC Company 3",
}
)
assert ctx.value.response.status_code == HTTPStatus.FORBIDDEN


def test_object_role_permission_validation(galaxy_client, custom_role_factory, namespace):
Expand Down Expand Up @@ -501,7 +593,8 @@ def _rf(user_id, group_id, expected=True):
assert group["name"] not in [g["name"] for g in user["groups"]]

assignment_r = gc.get(
f"_ui/v2/role_user_assignments/?user={user['id']}&content_type__model=team"
"_ui/v2/role_user_assignments/",
params={"user": user['id'], "content_type__model": "team"},
)
group_ids = [assignment["object_id"] for assignment in assignment_r["results"]]
if expected:
Expand All @@ -515,11 +608,11 @@ def _rf(user_id, group_id, expected=True):
def user_and_group(request, galaxy_client):
"Return a tuple of a user and group where the user is not in the group"
gc = galaxy_client("admin")
user_r = gc.get("_ui/v2/users/?username=jdoe")
user_r = gc.get("_ui/v2/users/", params={"username": "jdoe"})
assert user_r["count"] > 0
user = user_r["results"][0]

group_r = gc.get("_ui/v2/groups/?name=ship_crew")
group_r = gc.get("_ui/v2/groups/", params={"name": "ship_crew"})
assert group_r["count"] > 0
group = group_r["results"][0]

Expand Down Expand Up @@ -563,12 +656,12 @@ def test_team_member_sync_from_dab_to_pulp(galaxy_client, assert_user_in_group,
assert_user_in_group(user["id"], group["id"], expected=False)

# Get the DAB team member role
rd_list = gc.get("_ui/v2/role_definitions/?name=Galaxy Team Member")
rd_list = gc.get("_ui/v2/role_definitions/", params={"name": "Galaxy Team Member"})
assert rd_list["count"] == 1
rd = rd_list["results"][0]

# Because group and team are different, we look up the team by name, for DAB
team_list = gc.get(f"_ui/v2/teams/?name={group['name']}")
team_list = gc.get("_ui/v2/teams/", params={"name": group['name']})
assert team_list["count"] == 1, team_list
team = team_list["results"][0]

Expand Down

0 comments on commit cf98a67

Please sign in to comment.