diff --git a/src/spaceone/identity/interface/grpc/user_profile.py b/src/spaceone/identity/interface/grpc/user_profile.py index 809b4c56..9f51bf20 100644 --- a/src/spaceone/identity/interface/grpc/user_profile.py +++ b/src/spaceone/identity/interface/grpc/user_profile.py @@ -65,4 +65,4 @@ def get_workspaces(self, request, context): params, metadata = self.parse_request(request, context) user_profile_svc = UserProfileService(metadata) response: dict = user_profile_svc.get_workspaces(params) - return ParseDict(response, workspace_pb2.WorkspacesInfo()) + return self.dict_to_message(response) diff --git a/src/spaceone/identity/model/user_profile/response.py b/src/spaceone/identity/model/user_profile/response.py new file mode 100644 index 00000000..a68c6d0c --- /dev/null +++ b/src/spaceone/identity/model/user_profile/response.py @@ -0,0 +1,35 @@ +from datetime import datetime +from typing import Union, List, Literal +from pydantic import BaseModel + +from spaceone.core import utils + +__all__ = ["MyWorkspaceResponse", "MyWorkspacesResponse"] + +State = Literal["ENABLED", "DISABLED"] + + +class MyWorkspaceResponse(BaseModel): + workspace_id: Union[str, None] = None + name: Union[str, None] = None + state: Union[State, None] = None + role_type: Union[str, None] = None + tags: Union[dict, None] = None + created_by: Union[str, None] = None + reference_id: Union[str, None] = None + is_managed: Union[bool, None] = None + role_id: Union[str, None] = None + domain_id: Union[str, None] = None + created_at: Union[datetime, None] = None + last_synced_at: Union[datetime, None] = None + + def dict(self, *args, **kwargs): + data = super().dict(*args, **kwargs) + data["created_at"] = utils.datetime_to_iso8601(data["created_at"]) + data["last_synced_at"] = utils.datetime_to_iso8601(data.get("last_synced_at")) + return data + + +class MyWorkspacesResponse(BaseModel): + results: List[MyWorkspaceResponse] + total_count: int diff --git a/src/spaceone/identity/service/user_profile_service.py b/src/spaceone/identity/service/user_profile_service.py index b1866356..4245da77 100644 --- a/src/spaceone/identity/service/user_profile_service.py +++ b/src/spaceone/identity/service/user_profile_service.py @@ -23,7 +23,10 @@ from spaceone.identity.model.user_profile.request import * from spaceone.identity.model.user.response import * from spaceone.identity.model.user.database import User +from spaceone.identity.model.role_binding.response import RoleBindingsResponse +from spaceone.identity.model.workspace.response import WorkspaceResponse, WorkspacesResponse from spaceone.identity.model.workspace.response import WorkspacesResponse +from spaceone.identity.model.user_profile.response import MyWorkspaceResponse, MyWorkspacesResponse _LOGGER = logging.getLogger(__name__) @@ -327,7 +330,7 @@ def get(self, params: UserProfileGetRequest) -> Union[UserResponse, dict]: @convert_model def get_workspaces( self, params: UserProfileGetWorkspacesRequest - ) -> Union[WorkspacesResponse, dict]: + ) -> Union[MyWorkspacesResponse, dict]: """Find user Args: params (UserWorkspacesRequest): { @@ -335,7 +338,7 @@ def get_workspaces( 'domain_id': 'str' # injected from auth (required) } Returns: - WorkspacesResponse: + MyWorkspaceResponse: """ rb_mgr = RoleBindingManager() @@ -346,38 +349,35 @@ def get_workspaces( if user_vo.role_type == "DOMAIN_ADMIN": allow_all = True - else: - rb_vos = rb_mgr.filter_role_bindings( - user_id=params.user_id, - domain_id=params.domain_id, - role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], - workspace_id="*", - ) - if rb_vos.count() > 0: - allow_all = True + rb_vos = rb_mgr.filter_role_bindings( + user_id=params.user_id, + domain_id=params.domain_id, + role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) if allow_all: workspace_vos = workspace_mgr.filter_workspaces( domain_id=params.domain_id, state="ENABLED" ) else: - rb_vos = rb_mgr.filter_role_bindings( - user_id=params.user_id, - domain_id=params.domain_id, - role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], - ) - workspace_ids = list(set([rb.workspace_id for rb in rb_vos])) workspace_vos = workspace_mgr.filter_workspaces( workspace_id=workspace_ids, domain_id=params.domain_id, state="ENABLED" ) + role_bindings_info_map = { + rb.workspace_id: rb.to_dict() for rb in rb_vos + } workspaces_info = [workspace_vo.to_dict() for workspace_vo in workspace_vos] - return WorkspacesResponse( - results=workspaces_info, total_count=len(workspaces_info) + my_workspaces_info = self._get_my_workspaces_info(workspaces_info, role_bindings_info_map) + + return MyWorkspacesResponse( + results=my_workspaces_info, total_count=len(my_workspaces_info) ) + # my_workspaces_info = self._get_my_workspaces_info(workspaces_info, role_bindings_info_map) + def _get_domain_name(self, domain_id: str) -> str: domain_vo = self.domain_mgr.get_domain(domain_id) return domain_vo.name @@ -439,3 +439,16 @@ def _generate_temporary_password(): and re.search("[0-9]", random_password) ): return random_password + + @staticmethod + def _get_my_workspaces_info(workspaces_info: list, role_bindings_info_map: dict) -> list: + my_workspaces_info = [] + + for workspace_info in workspaces_info: + if rb_info := role_bindings_info_map.get(workspace_info["workspace_id"]): + workspace_info.update({ + "role_id": rb_info.get("role_id"), + "role_type": rb_info.get("role_type"), + }) + my_workspaces_info.append(workspace_info) + return my_workspaces_info