From c9f93b5b34698040c7756f8ed3580795ce94ceeb Mon Sep 17 00:00:00 2001 From: Libor Pichler Date: Thu, 19 Dec 2024 16:46:52 +0100 Subject: [PATCH] Update role in internal endpoint --- rbac/internal/urls.py | 1 + rbac/internal/views.py | 68 ++++++++++++++ tests/internal/test_views.py | 171 +++++++++++++++++++++++++++++++++++ 3 files changed, 240 insertions(+) diff --git a/rbac/internal/urls.py b/rbac/internal/urls.py index bf42e3a6..585d38b0 100644 --- a/rbac/internal/urls.py +++ b/rbac/internal/urls.py @@ -66,6 +66,7 @@ path("api/tenant/unmodified/", views.list_unmodified_tenants), path("api/tenant/", views.list_tenants), path("api/tenant//", views.tenant_view), + path("api/roles//", views.roles), path("api/migrations/run/", views.run_migrations), path("api/migrations/progress/", views.migration_progress), path("api/seeds/run/", views.run_seeds), diff --git a/rbac/internal/views.py b/rbac/internal/views.py index b3136868..8ad80bbd 100644 --- a/rbac/internal/views.py +++ b/rbac/internal/views.py @@ -710,6 +710,74 @@ def reset_imported_tenants(request: HttpRequest) -> HttpResponse: return HttpResponse("Invalid method", status=405) +ALLOWED_ROLE_UPDATE_ATTRIBUTES = {"system", "platform_default", "admin_default"} + + +def str_to_bool(value: str) -> bool: + """Convert string to bool.""" + return value.strip().lower() == "true" + + +def handle_error(message: str, status_response: int) -> HttpResponse: + """Return HttpResponse object.""" + return HttpResponse(json.dumps({"error": message}), content_type="application/json", status=status_response) + + +def get_role_response(role: Role) -> HttpResponse: + """Return role response in HttpResponse object.""" + response_data = { + "message": "Role retrieved successfully", + "role": { + "uuid": str(role.uuid), + "name": role.name, + "system": role.system, + "admin_default": role.admin_default, + "platform_default": role.platform_default, + }, + } + return HttpResponse(json.dumps(response_data), content_type="application/json", status=200) + + +def roles(request, uuid: str) -> HttpResponse: + """Update or get role. + + GET /_private/api/role/uuid-uuid-uuid-uuid/ + PUT /_private/api/role/uuid-uuid-uuid-uuid/ + { + "system": "true" + } + """ + try: + role = get_object_or_404(Role, uuid=uuid) + + if request.method == "PUT": + body = json.loads(request.body) + + invalid_keys = set(body.keys()) - ALLOWED_ROLE_UPDATE_ATTRIBUTES + if invalid_keys: + return handle_error(f"Invalid attributes: {', '.join(invalid_keys)}", 400) + + if "system" in body: + role.system = str_to_bool(body["system"]) + + if "platform_default" in body: + role.platform_default = str_to_bool(body["platform_default"]) + + if "admin_default" in body: + role.admin_default = str_to_bool(body["admin_default"]) + + role.save() + return get_role_response(role) + + elif request.method == "GET": + return get_role_response(role) + + return handle_error("Invalid request method", 405) + + except Exception as e: + return handle_error(str(e), 500) + + def trigger_error(request): """Trigger an error to confirm Sentry is working.""" raise SentryDiagnosticError diff --git a/tests/internal/test_views.py b/tests/internal/test_views.py index e099ee34..5d3d265a 100644 --- a/tests/internal/test_views.py +++ b/tests/internal/test_views.py @@ -1123,3 +1123,174 @@ def test_reset_imported_tenants_excludes_delete(self, delay): self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertIn("Deleted 2 tenants.", logs.output[0]) self.assertEqual(4, Tenant.objects.count()) + + def test_update_system_flag_in_role(self): + """Test that we can update a role.""" + tenant = Tenant.objects.create(tenant_name="1234", org_id="1234") + custom_role = Role.objects.create( + name="role 1", system=True, tenant=tenant, platform_default=False, admin_default=False + ) + + request_body = { + "system": "false", + } + + json_request_body = json.dumps(request_body) + + self.assertTrue(custom_role.system) + self.assertFalse(custom_role.platform_default) + self.assertFalse(custom_role.admin_default) + + response = self.client.put( + f"/_private/api/roles/{custom_role.uuid}/", + json_request_body, + **self.request.META, + content_type="application/json", + ) + + custom_role.refresh_from_db() + + self.assertFalse(custom_role.system) + self.assertFalse(custom_role.platform_default) + self.assertFalse(custom_role.admin_default) + self.assertEqual(response.status_code, 200) + + def test_update_platform_default_flag_in_role(self): + """Test that we can update a role.""" + tenant = Tenant.objects.create(tenant_name="1234", org_id="1234") + custom_role = Role.objects.create( + name="role 1", system=False, tenant=tenant, platform_default=True, admin_default=False + ) + + request_body = { + "platform_default": "false", + } + + json_request_body = json.dumps(request_body) + + self.assertFalse(custom_role.system) + self.assertTrue(custom_role.platform_default) + self.assertFalse(custom_role.admin_default) + + response = self.client.put( + f"/_private/api/roles/{custom_role.uuid}/", + json_request_body, + **self.request.META, + content_type="application/json", + ) + + custom_role.refresh_from_db() + + self.assertFalse(custom_role.system, False) + self.assertFalse(custom_role.platform_default, False) + self.assertFalse(custom_role.admin_default, False) + self.assertEqual(response.status_code, 200) + + def test_update_admin_default_flag_in_role(self): + """Test that we can update a role.""" + tenant = Tenant.objects.create(tenant_name="1234", org_id="1234") + custom_role = Role.objects.create( + name="role 1", system=False, tenant=tenant, platform_default=False, admin_default=True + ) + + request_body = { + "admin_default": "false", + } + + json_request_body = json.dumps(request_body) + + self.assertFalse(custom_role.system) + self.assertFalse(custom_role.platform_default) + self.assertTrue(custom_role.admin_default) + + response = self.client.put( + f"/_private/api/roles/{custom_role.uuid}/", + json_request_body, + **self.request.META, + content_type="application/json", + ) + + custom_role.refresh_from_db() + + self.assertFalse(custom_role.system) + self.assertFalse(custom_role.platform_default) + self.assertFalse(custom_role.admin_default) + self.assertEqual(response.status_code, 200) + + def test_update_role(self): + """Test that we can update a role.""" + tenant = Tenant.objects.create(tenant_name="1234", org_id="1234") + custom_role = Role.objects.create( + name="role 1", system=False, tenant=tenant, platform_default=False, admin_default=False + ) + + request_body = {"admin_default": "true", "system": "true", "platform_default": "true"} + + json_request_body = json.dumps(request_body) + + self.assertFalse(custom_role.system) + self.assertFalse(custom_role.platform_default) + self.assertFalse(custom_role.admin_default) + + response = self.client.put( + f"/_private/api/roles/{custom_role.uuid}/", + json_request_body, + **self.request.META, + content_type="application/json", + ) + + custom_role.refresh_from_db() + + self.assertTrue(custom_role.system) + self.assertTrue(custom_role.platform_default) + self.assertTrue(custom_role.admin_default) + self.assertEqual(response.status_code, 200) + + def test_update_role_fails_disallowed_attributes(self): + """Test that we can update a role.""" + tenant = Tenant.objects.create(tenant_name="1234", org_id="1234") + custom_role = Role.objects.create( + name="role 1", system=False, tenant=tenant, platform_default=False, admin_default=False + ) + + request_body = {"admin_default": "true", "system": "true", "name": "platform_default"} + + json_request_body = json.dumps(request_body) + + self.assertFalse(custom_role.system) + self.assertFalse(custom_role.platform_default) + self.assertFalse(custom_role.admin_default) + + response = self.client.put( + f"/_private/api/roles/{custom_role.uuid}/", + json_request_body, + **self.request.META, + content_type="application/json", + ) + + custom_role.refresh_from_db() + + self.assertFalse(custom_role.system) + self.assertFalse(custom_role.platform_default) + self.assertFalse(custom_role.admin_default) + self.assertEqual(response.status_code, 400) + + def test_fetch_role(self): + """Test that we can update a role.""" + tenant = Tenant.objects.create(tenant_name="1234", org_id="1234") + custom_role = Role.objects.create( + name="role 1", system=False, tenant=tenant, platform_default=False, admin_default=False + ) + + response = self.client.get( + f"/_private/api/roles/{custom_role.uuid}/", + **self.request.META, + content_type="application/json", + ) + + body = json.loads(response.content.decode()) + role = body["role"] + self.assertFalse(role["system"]) + self.assertFalse(role["platform_default"]) + self.assertFalse(role["admin_default"]) + self.assertEqual(response.status_code, 200)