diff --git a/src/eduid/common/clients/oidc_client/__init__.py b/src/eduid/common/clients/oidc_client/__init__.py new file mode 100644 index 000000000..73c8b5f0c --- /dev/null +++ b/src/eduid/common/clients/oidc_client/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +__author__ = "lundberg" diff --git a/src/eduid/common/clients/oidc_client/base.py b/src/eduid/common/clients/oidc_client/base.py new file mode 100644 index 000000000..6f913acf7 --- /dev/null +++ b/src/eduid/common/clients/oidc_client/base.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- + +from pydantic import AnyUrl, BaseModel, Field + +__author__ = "lundberg" + + +class AuthlibClientConfig(BaseModel): + client_id: str + client_secret: str + issuer: AnyUrl + code_challenge_method: str = Field(default="S256") + acr_values: list[str] = Field(default_factory=list) + scopes: list[str] = Field(default=["openid"]) diff --git a/src/eduid/common/config/base.py b/src/eduid/common/config/base.py index 906148af8..a1f905fbf 100644 --- a/src/eduid/common/config/base.py +++ b/src/eduid/common/config/base.py @@ -471,6 +471,7 @@ class ProofingConfigMixin(FrontendActionMixin): foreign_eid_proofing_version: str = Field(default="2022v1") svipe_id_proofing_version: str = Field(default="2023v2") bankid_proofing_version: str = Field(default="2023v1") + freja_eid_proofing_version: str = Field(default="2024v1") # security key proofing security_key_proofing_method: CredentialProofingMethod = Field(default=CredentialProofingMethod.SWAMID_AL3_MFA) diff --git a/src/eduid/userdb/credentials/external.py b/src/eduid/userdb/credentials/external.py index 9a0d65876..0ec08284c 100644 --- a/src/eduid/userdb/credentials/external.py +++ b/src/eduid/userdb/credentials/external.py @@ -16,6 +16,7 @@ class TrustFramework(str, Enum): EIDAS = "EIDAS" SVIPE = "SVIPE" BANKID = "BANKID" + FREJA = "FREJA" class ExternalCredential(Credential): diff --git a/src/eduid/userdb/identity.py b/src/eduid/userdb/identity.py index e0d180fe7..c595a4093 100644 --- a/src/eduid/userdb/identity.py +++ b/src/eduid/userdb/identity.py @@ -19,6 +19,7 @@ class IdentityType(str, Enum): NIN = "nin" EIDAS = "eidas" SVIPE = "svipe" + FREJA = "freja" class IdentityProofingMethod(str, Enum): @@ -29,6 +30,7 @@ class IdentityProofingMethod(str, Enum): SWEDEN_CONNECT = "swedenconnect" TELEADRESS = "TeleAdress" BANKID = "bankid" + FREJA_EID = "freja_eid" class IdentityElement(VerifiedElement, ABC): @@ -84,6 +86,8 @@ def get_missing_proofing_method(self) -> Optional[IdentityProofingMethod]: return IdentityProofingMethod.SE_LEG case "svipe_id": return IdentityProofingMethod.SVIPE_ID + case "freja_eid": + return IdentityProofingMethod.FREJA_EID case _: logger.warning(f"Unknown verified_by value: {self.verified_by}") return None @@ -184,6 +188,39 @@ def unique_value(self) -> str: return self.svipe_id +class FrejaRegistrationLevel(str, Enum): + EXTENDED = "EXTENDED" + PLUS = "PLUS" + + +class FrejaIdentity(ForeignIdentityElement): + """ + Element that is used as a Freja identity for a user + + Properties of FrejaIdentity: + + user_id + personal_identity_number + registration_level + country_code + """ + + identity_type: Literal[IdentityType.FREJA] = IdentityType.FREJA + # claim: https://frejaeid.com/oidc/scopes/relyingPartyUserId + # A unique, user-specific value that allows the Relying Party to identify the same user across multiple sessions + user_id: str + personal_identity_number: Optional[str] = None + registration_level: FrejaRegistrationLevel + + @property + def unique_key_name(self) -> str: + return "user_id" + + @property + def unique_value(self) -> str: + return self.user_id + + class IdentityList(VerifiedElementList[IdentityElement]): """ Hold a list of IdentityElement instances. @@ -200,6 +237,8 @@ def from_list_of_dicts(cls: type[IdentityList], items: list[dict[str, Any]]) -> elements.append(EIDASIdentity.from_dict(item)) elif _type == IdentityType.SVIPE.value: elements.append(SvipeIdentity.from_dict(item)) + elif _type == IdentityType.FREJA.value: + elements.append(FrejaIdentity.from_dict(item)) else: raise ValueError(f"identity_type {_type} not valid") return cls(elements=elements) @@ -236,6 +275,13 @@ def svipe(self) -> Optional[SvipeIdentity]: return _svipe[0] return None + @property + def freja(self) -> Optional[FrejaIdentity]: + _freja = self.filter(FrejaIdentity) + if _freja: + return _freja[0] + return None + @property def date_of_birth(self) -> Optional[datetime]: if not self.is_verified: @@ -263,6 +309,9 @@ def date_of_birth(self) -> Optional[datetime]: # SVIPE if self.svipe and self.svipe.is_verified: return self.svipe.date_of_birth + # Freja eID + if self.freja and self.freja.is_verified: + return self.freja.date_of_birth return None def to_frontend_format(self) -> dict[str, Any]: diff --git a/src/eduid/userdb/logs/element.py b/src/eduid/userdb/logs/element.py index 11533ec8c..d5cd6024e 100644 --- a/src/eduid/userdb/logs/element.py +++ b/src/eduid/userdb/logs/element.py @@ -442,6 +442,69 @@ class BankIDProofing(NinEIDProofingLogElement): proofing_method: str = IdentityProofingMethod.BANKID.value +class FrejaEIDNINProofing(NinEIDProofingLogElement): + """ + { + 'eduPersonPrincipalName': eppn, + 'created_ts': utc_now(), + 'created_by': 'application', + 'proofing_method': 'freja_eid', + 'proofing_version': '2024v1', + 'user_id': 'unique identifier for the user', + 'document_type': 'type of document used for identification', + 'document_number': 'document number', + 'nin': 'national_identity_number', + 'given_name': 'name', + 'surname': 'name', + } + + Proofing version history: + 2024v1 - inital deployment + """ + + # unique identifier + user_id: str + # transaction id + transaction_id: str + # document type + document_type: str + # document number + document_number: str + # Proofing method name + proofing_method: str = IdentityProofingMethod.FREJA_EID.value + + +class FrejaEIDForeignProofing(ForeignIdProofingLogElement): + """ + { + 'eduPersonPrincipalName': eppn, + 'created_ts': utc_now(), + 'created_by': 'application', + 'proofing_method': 'freja_eid', + 'proofing_version': '2024v1', + 'user_id': 'unique identifier for the user', + 'document_type': 'type of document used for identification', + 'document_number': 'document number', + 'issuing_country': 'country of issuance', + } + """ + + # unique identifier + user_id: str + # transaction id + transaction_id: str + # document administrative number + administrative_number: Optional[str] + # document type (standardized english) + document_type: str + # document number + document_number: str + # issuing country + issuing_country: str + # Proofing method name + proofing_method: str = IdentityProofingMethod.FREJA_EID.value + + class MFATokenProofing(SwedenConnectProofing): """ { diff --git a/src/eduid/userdb/proofing/db.py b/src/eduid/userdb/proofing/db.py index 2291a5cdb..121023013 100644 --- a/src/eduid/userdb/proofing/db.py +++ b/src/eduid/userdb/proofing/db.py @@ -267,3 +267,8 @@ def __init__(self, db_uri: str, db_name: str = "eduid_svipe_id"): class BankIDProofingUserDB(ProofingUserDB): def __init__(self, db_uri: str, db_name: str = "eduid_bankid"): super().__init__(db_uri, db_name) + + +class FrejaEIDProofingUserDB(ProofingUserDB): + def __init__(self, db_uri: str, db_name: str = "eduid_freja_eid"): + super().__init__(db_uri, db_name) diff --git a/src/eduid/webapp/common/api/schemas/identity.py b/src/eduid/webapp/common/api/schemas/identity.py index a61d49b4e..08442bdd6 100644 --- a/src/eduid/webapp/common/api/schemas/identity.py +++ b/src/eduid/webapp/common/api/schemas/identity.py @@ -23,3 +23,4 @@ class IdentitiesSchema(EduidSchema): nin = fields.Nested(NinIdentitySchema) eidas = fields.Nested(ForeignIdentitySchema) svipe = fields.Nested(ForeignIdentitySchema) + freja = fields.Nested(ForeignIdentitySchema) diff --git a/src/eduid/webapp/common/proofing/methods.py b/src/eduid/webapp/common/proofing/methods.py index d792b520f..15cf140ea 100644 --- a/src/eduid/webapp/common/proofing/methods.py +++ b/src/eduid/webapp/common/proofing/methods.py @@ -14,6 +14,7 @@ from eduid.webapp.common.authn.session_info import SessionInfo from eduid.webapp.common.proofing.messages import ProofingMsg from eduid.webapp.eidas.saml_session_info import ForeignEidSessionInfo, NinSessionInfo +from eduid.webapp.freja_eid.helpers import FrejaEIDDocumentUserInfo, FrejaEIDTokenResponse from eduid.webapp.svipe_id.helpers import SvipeDocumentUserInfo logger = logging.getLogger(__name__) @@ -22,7 +23,9 @@ @dataclass class SessionInfoParseResult: error: Optional[TranslatableMsg] = None - info: Optional[Union[NinSessionInfo, ForeignEidSessionInfo, SvipeDocumentUserInfo, BankIDSessionInfo]] = None + info: Optional[ + Union[NinSessionInfo, ForeignEidSessionInfo, SvipeDocumentUserInfo, BankIDSessionInfo, FrejaEIDDocumentUserInfo] + ] = None @dataclass(frozen=True) @@ -119,11 +122,33 @@ def parse_session_info(self, session_info: SessionInfo, backdoor: bool) -> Sessi return SessionInfoParseResult(info=parsed_session_info) +@dataclass(frozen=True) +class ProofingMethodFrejaEID(ProofingMethod): + def parse_session_info(self, session_info: SessionInfo, backdoor: bool) -> SessionInfoParseResult: + try: + parsed_session_info = FrejaEIDDocumentUserInfo(**session_info) + logger.debug(f"session info: {parsed_session_info}") + except ValidationError: + logger.exception("missing claim in userinfo response") + return SessionInfoParseResult(error=ProofingMsg.attribute_missing) + + # verify session info data + # document should not have expired + expiration_date = parsed_session_info.document.expiration_date + if expiration_date < utc_now().date(): + logger.error(f"Document has expired {expiration_date}") + return SessionInfoParseResult(error=ProofingMsg.session_info_not_valid) + + return SessionInfoParseResult(info=parsed_session_info) + + def get_proofing_method( method: Optional[str], frontend_action: FrontendAction, config: ProofingConfigMixin, -) -> Optional[Union[ProofingMethodFreja, ProofingMethodEidas, ProofingMethodSvipe, ProofingMethodBankID]]: +) -> Optional[ + Union[ProofingMethodFreja, ProofingMethodEidas, ProofingMethodSvipe, ProofingMethodBankID, ProofingMethodFrejaEID] +]: authn_params = config.frontend_action_authn_parameters.get(frontend_action) assert authn_params is not None # please mypy @@ -167,6 +192,12 @@ def get_proofing_method( idp=config.bankid_idp, required_loa=config.bankid_required_loa, ) + if method == "freja_eid": + return ProofingMethodFrejaEID( + method=method, + framework=TrustFramework.FREJA, + finish_url=authn_params.finish_url, + ) logger.warning(f"Unknown proofing method {method}") return None diff --git a/src/eduid/webapp/common/session/eduid_session.py b/src/eduid/webapp/common/session/eduid_session.py index 908051e23..339eeeade 100644 --- a/src/eduid/webapp/common/session/eduid_session.py +++ b/src/eduid/webapp/common/session/eduid_session.py @@ -24,6 +24,7 @@ BankIDNamespace, Common, EidasNamespace, + FrejaEIDNamespace, IdP_Namespace, MfaAction, Phone, @@ -59,6 +60,7 @@ class EduidNamespaces(BaseModel): authn: Optional[AuthnNamespace] = None svipe_id: Optional[SvipeIDNamespace] = None bankid: Optional[BankIDNamespace] = None + freja_eid: Optional[FrejaEIDNamespace] = None class EduidSession(SessionMixin, MutableMapping[str, Any]): @@ -245,6 +247,12 @@ def bankid(self) -> BankIDNamespace: self._namespaces.bankid = BankIDNamespace.from_dict(self._session.get("bankid", {})) return self._namespaces.bankid + @property + def freja_eid(self) -> FrejaEIDNamespace: + if not self._namespaces.freja_eid: + self._namespaces.freja_eid = FrejaEIDNamespace.from_dict(self._session.get("freja_eid", {})) + return self._namespaces.freja_eid + @property def created(self) -> datetime: """ diff --git a/src/eduid/webapp/common/session/namespaces.py b/src/eduid/webapp/common/session/namespaces.py index 42caea6f2..1a4e2f745 100644 --- a/src/eduid/webapp/common/session/namespaces.py +++ b/src/eduid/webapp/common/session/namespaces.py @@ -19,6 +19,7 @@ from eduid.userdb.credentials.external import TrustFramework from eduid.userdb.element import ElementKey from eduid.webapp.common.authn.acs_enums import AuthnAcsAction, BankIDAcsAction, EidasAcsAction +from eduid.webapp.freja_eid.callback_enums import FrejaEIDAction from eduid.webapp.idp.other_device.data import OtherDeviceId from eduid.webapp.svipe_id.callback_enums import SvipeIDAction @@ -245,7 +246,9 @@ class BaseAuthnRequest(BaseModel, ABC): frontend_action: FrontendAction # what action frontend is performing frontend_state: Optional[str] = None # opaque data from frontend, returned in /status method: Optional[str] = None # proofing method that frontend is invoking - post_authn_action: Optional[Union[AuthnAcsAction, EidasAcsAction, SvipeIDAction, BankIDAcsAction]] = None + post_authn_action: Optional[ + Union[AuthnAcsAction, EidasAcsAction, SvipeIDAction, BankIDAcsAction, FrejaEIDAction] + ] = None created_ts: datetime = Field(default_factory=utc_now) authn_instant: Optional[datetime] = None status: Optional[str] = None # populated by the SAML2 ACS/OIDC callback action @@ -332,3 +335,7 @@ class SvipeIDNamespace(SessionNSBase): class BankIDNamespace(SessionNSBase): sp: SPAuthnData = Field(default=SPAuthnData()) + + +class FrejaEIDNamespace(SessionNSBase): + rp: RPAuthnData = Field(default=RPAuthnData()) diff --git a/src/eduid/webapp/freja_eid/__init__.py b/src/eduid/webapp/freja_eid/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/eduid/webapp/freja_eid/app.py b/src/eduid/webapp/freja_eid/app.py new file mode 100644 index 000000000..74aaa17b1 --- /dev/null +++ b/src/eduid/webapp/freja_eid/app.py @@ -0,0 +1,77 @@ +from typing import Any, Mapping, Optional, cast + +from authlib.integrations.flask_client import OAuth +from flask import current_app + +from eduid.common.config.parsers import load_config +from eduid.common.rpc.am_relay import AmRelay +from eduid.userdb.logs import ProofingLog +from eduid.userdb.proofing.db import FrejaEIDProofingUserDB +from eduid.webapp.common.authn.middleware import AuthnBaseApp +from eduid.webapp.freja_eid.helpers import SessionOAuthCache +from eduid.webapp.freja_eid.settings.common import FrejaEIDConfig + +__author__ = "lundberg" + + +class FrejaEIDApp(AuthnBaseApp): + def __init__(self, config: FrejaEIDConfig, **kwargs): + super().__init__(config, **kwargs) + + self.conf = config + # Init dbs + self.private_userdb = FrejaEIDProofingUserDB(self.conf.mongo_uri) + self.proofing_log = ProofingLog(config.mongo_uri) + # Init celery + self.am_relay = AmRelay(config) + + # Initialize the oidc_client + self.oidc_client = OAuth(self, cache=SessionOAuthCache()) + client_kwargs = {} + if self.conf.freja_eid_client.scopes: + client_kwargs["scope"] = " ".join(self.conf.freja_eid_client.scopes) + if self.conf.freja_eid_client.code_challenge_method: + client_kwargs["code_challenge_method"] = self.conf.freja_eid_client.code_challenge_method + authorize_params = {} + if self.conf.freja_eid_client.acr_values: + authorize_params["acr_values"] = " ".join(self.conf.freja_eid_client.acr_values) + self.oidc_client.register( + name="freja_eid", + client_id=self.conf.freja_eid_client.client_id, + client_secret=self.conf.freja_eid_client.client_secret, + client_kwargs=client_kwargs, + authorize_params=authorize_params, + server_metadata_url=f"{self.conf.freja_eid_client.issuer}/.well-known/openid-configuration", + ) + + +current_freja_eid_app = cast(FrejaEIDApp, current_app) + + +def freja_eid_init_app(name: str = "freja_eid", test_config: Optional[Mapping[str, Any]] = None) -> FrejaEIDApp: + """ + :param name: The name of the instance, it will affect the configuration loaded. + :param test_config: Override config. Used in test cases. + + :return: the flask app + """ + config = load_config(typ=FrejaEIDConfig, app_name=name, ns="webapp", test_config=test_config) + + # Load acs actions on app init + from . import callback_actions + + # Make sure pycharm doesn't think the import above is unused and removes it + if callback_actions.__author__: + pass + + app = FrejaEIDApp(config) + + app.logger.info(f"Init {app}...") + + # Register views + from eduid.webapp.freja_eid.views import freja_eid_views + + app.register_blueprint(freja_eid_views) + + app.logger.info(f"{name!s} initialized") + return app diff --git a/src/eduid/webapp/freja_eid/callback_actions.py b/src/eduid/webapp/freja_eid/callback_actions.py new file mode 100644 index 000000000..d92108213 --- /dev/null +++ b/src/eduid/webapp/freja_eid/callback_actions.py @@ -0,0 +1,44 @@ +from eduid.userdb import User +from eduid.webapp.common.api.decorators import require_user +from eduid.webapp.common.authn.acs_registry import ACSArgs, ACSResult, acs_action +from eduid.webapp.common.proofing.messages import ProofingMsg +from eduid.webapp.freja_eid.app import current_freja_eid_app as current_app +from eduid.webapp.freja_eid.callback_enums import FrejaEIDAction +from eduid.webapp.freja_eid.helpers import FrejaEIDDocumentUserInfo, FrejaEIDMsg, FrejaEIDTokenResponse +from eduid.webapp.freja_eid.proofing import get_proofing_functions + +__author__ = "lundberg" + + +@acs_action(FrejaEIDAction.verify_identity) +@require_user +def verify_identity_action(user: User, args: ACSArgs) -> ACSResult: + """ + Use a Freja OIDC userinfo to verify a users' identity. + """ + # please type checking + if not args.proofing_method: + return ACSResult(message=FrejaEIDMsg.method_not_available) + + parsed = args.proofing_method.parse_session_info(args.session_info, backdoor=args.backdoor) + if parsed.error: + return ACSResult(message=parsed.error) + + # please type checking + assert isinstance(parsed.info, FrejaEIDDocumentUserInfo) + + proofing = get_proofing_functions( + session_info=parsed.info, app_name=current_app.conf.app_name, config=current_app.conf, backdoor=args.backdoor + ) + + current = proofing.get_identity(user) + if current and current.is_verified: + current_app.logger.error(f"User already has a verified identity for {args.proofing_method.method}") + current_app.logger.debug(f"Current: {current}. Assertion: {args.session_info}") + return ACSResult(message=ProofingMsg.identity_already_verified) + + verify_result = proofing.verify_identity(user=user) + if verify_result.error is not None: + return ACSResult(message=verify_result.error) + + return ACSResult(success=True, message=FrejaEIDMsg.identity_verify_success) diff --git a/src/eduid/webapp/freja_eid/callback_enums.py b/src/eduid/webapp/freja_eid/callback_enums.py new file mode 100644 index 000000000..fc4b1d36c --- /dev/null +++ b/src/eduid/webapp/freja_eid/callback_enums.py @@ -0,0 +1,8 @@ +from enum import Enum, unique + +__author__ = "lundberg" + + +@unique +class FrejaEIDAction(str, Enum): + verify_identity = "verify-identity-action" diff --git a/src/eduid/webapp/freja_eid/helpers.py b/src/eduid/webapp/freja_eid/helpers.py new file mode 100644 index 000000000..beee42dc3 --- /dev/null +++ b/src/eduid/webapp/freja_eid/helpers.py @@ -0,0 +1,105 @@ +import datetime +import logging +from datetime import date +from enum import Enum, unique +from typing import Any, Optional + +from iso3166 import countries +from pydantic import BaseModel, ConfigDict, Field, field_validator + +from eduid.common.utils import uuid4_str +from eduid.userdb.identity import FrejaRegistrationLevel +from eduid.webapp.common.api.messages import TranslatableMsg +from eduid.webapp.common.session import session + +__author__ = "lundberg" + + +logger = logging.getLogger(__name__) + + +@unique +class FrejaEIDMsg(TranslatableMsg): + """ + Messages sent to the front end with information on the results of the + attempted operations on the back end. + """ + + # failed to create authn request + authn_request_failed = "freja_eid.authn_request_failed" + # Unavailable vetting method requested + method_not_available = "freja_eid.method_not_available" + # Identity verification success + identity_verify_success = "freja_eid.identity_verify_success" + # Authorization error at Freja EID + authorization_error = "freja_eid.authorization_fail" + frontend_action_not_supported = "freja_eid.frontend_action_not_supported" + # registration level not satisfied + registration_level_not_satisfied = "freja_eid.registration_level_not_satisfied" + + +class SessionOAuthCache: + @staticmethod + def get(key: str) -> Any: + logger.debug(f"Getting {key} from session.freja_eid.oauth_cache") + return session.freja_eid.rp.authlib_cache.get(key) + + @staticmethod + def set(key: str, value: Any, expires: Optional[int] = None) -> None: + session.freja_eid.rp.authlib_cache[key] = value + logger.debug(f"Set {key}={value} (expires={expires}) in session.freja_eid.oauth_cache") + + @staticmethod + def delete(key: str) -> None: + del session.freja_eid.rp.authlib_cache[key] + logger.debug(f"Deleted {key} from session.freja_eid.oauth_cache") + + +class UserInfoBase(BaseModel): + aud: str + exp: int + iat: int + iss: str + sub: str + model_config = ConfigDict(extra="allow", populate_by_name=True) + + +class FrejaDocumentType(Enum): + PASSPORT = "PASS" + DRIVING_LICENCE = "DRILIC" + NATIONAL_ID = "NATID" + SIS_CERTIFIED_ID = "IDSIS" + TAX_AGENCY_ID = "TAXID" + OTHER_ID = "OTHERID" + + +class FrejaDocument(BaseModel): + type: FrejaDocumentType + country: str + serial_number: str = Field(alias="serialNumber") + expiration_date: date = Field(alias="expirationDate") + model_config = ConfigDict(populate_by_name=True) + + +class FrejaEIDDocumentUserInfo(UserInfoBase): + country: str = Field(alias="https://frejaeid.com/oidc/claims/country") + document: FrejaDocument = Field(alias="https://frejaeid.com/oidc/claims/document") + family_name: str + given_name: str + name: str + personal_identity_number: Optional[str] = Field( + alias="https:/frejaeid.com/oidc/claims/personalIdentityNumber", default=None + ) + date_of_birth: date = Field(alias="https://frejaeid.com/oidc/claims/birthdate") + registration_level: FrejaRegistrationLevel = Field(alias="https://frejaeid.com/oidc/claims/registrationLevel") + user_id: str = Field(alias="https://frejaeid.com/oidc/claims/relyingPartyUserId") + transaction_id: str = Field(alias="https://frejaeid.com/oidc/claims/transactionReference") + + +class FrejaEIDTokenResponse(BaseModel): + access_token: str + expires_at: int + expires_in: int + id_token: str + token_type: str + userinfo: FrejaEIDDocumentUserInfo diff --git a/src/eduid/webapp/freja_eid/proofing.py b/src/eduid/webapp/freja_eid/proofing.py new file mode 100644 index 000000000..6fd981f2e --- /dev/null +++ b/src/eduid/webapp/freja_eid/proofing.py @@ -0,0 +1,243 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + +from iso3166 import countries +from pymongo.errors import PyMongoError + +from eduid.common.config.base import ProofingConfigMixin +from eduid.common.rpc.exceptions import AmTaskFailed +from eduid.userdb import User +from eduid.userdb.credentials import Credential +from eduid.userdb.element import ElementKey +from eduid.userdb.exceptions import LockedIdentityViolation +from eduid.userdb.identity import ( + FrejaIdentity, + FrejaRegistrationLevel, + IdentityElement, + IdentityProofingMethod, + IdentityType, +) +from eduid.userdb.logs.element import FrejaEIDForeignProofing, FrejaEIDNINProofing, NinProofingLogElement +from eduid.userdb.proofing import NinProofingElement, ProofingUser +from eduid.userdb.proofing.state import NinProofingState +from eduid.webapp.common.api.helpers import set_user_names_from_foreign_id, verify_nin_for_user +from eduid.webapp.common.api.messages import CommonMsg +from eduid.webapp.common.proofing.base import ( + MatchResult, + ProofingElementResult, + ProofingFunctions, + VerifyCredentialResult, + VerifyUserResult, +) +from eduid.webapp.common.proofing.methods import ProofingMethod +from eduid.webapp.freja_eid.app import current_freja_eid_app as current_app +from eduid.webapp.freja_eid.helpers import FrejaEIDDocumentUserInfo, FrejaEIDMsg, FrejaEIDTokenResponse + +__author__ = "lundberg" + + +@dataclass +class FrejaEIDProofingFunctions(ProofingFunctions[FrejaEIDDocumentUserInfo]): + def is_swedish_document(self) -> bool: + issuing_country = countries.get(self.session_info.document.country, None) + sweden = countries.get("SE") + if not sweden: + raise RuntimeError('Could not find country "SE" in iso3166') + if not issuing_country: + raise RuntimeError(f'Could not find country "{self.session_info.document.country}" in iso3166') + if issuing_country == sweden: + return True + return False + + def get_identity(self, user: User) -> Optional[IdentityElement]: + if self.is_swedish_document(): + return user.identities.nin + return user.identities.freja + + def verify_identity(self, user: User) -> VerifyUserResult: + if self.is_swedish_document(): + return self._verify_nin_identity(user) + return self._verify_foreign_identity(user) + + def _verify_nin_identity(self, user: User) -> VerifyUserResult: + proofing_user = ProofingUser.from_user(user, current_app.private_userdb) + + # force user to be registered with PLUS registration level for a NIN identity + if self.session_info.registration_level != FrejaRegistrationLevel.PLUS: + return VerifyUserResult(error=FrejaEIDMsg.registration_level_not_satisfied) + + # Create a proofing log + proofing_log_entry = self.identity_proofing_element(user=user) + if proofing_log_entry.error: + return VerifyUserResult(error=proofing_log_entry.error) + assert isinstance(proofing_log_entry.data, NinProofingLogElement) # please type checking + + # Verify NIN for user + date_of_birth = self.session_info.date_of_birth + try: + nin_element = NinProofingElement( + number=self.session_info.personal_identity_number, + date_of_birth=datetime(year=date_of_birth.year, month=date_of_birth.month, day=date_of_birth.day), + created_by=current_app.conf.app_name, + is_verified=False, + ) + proofing_state = NinProofingState(id=None, modified_ts=None, eppn=proofing_user.eppn, nin=nin_element) + if not verify_nin_for_user(proofing_user, proofing_state, proofing_log_entry.data): + current_app.logger.error(f"Failed verifying NIN for user {proofing_user}") + return VerifyUserResult(error=CommonMsg.temp_problem) + except (AmTaskFailed, PyMongoError): + current_app.logger.exception("Verifying NIN for user failed") + return VerifyUserResult(error=CommonMsg.temp_problem) + except LockedIdentityViolation: + current_app.logger.exception("Verifying NIN for user failed") + return VerifyUserResult(error=CommonMsg.locked_identity_not_matching) + + current_app.stats.count(name="nin_verified") + # re-load the user from central db before returning + _user = current_app.central_userdb.get_user_by_eppn(proofing_user.eppn) + return VerifyUserResult(user=ProofingUser.from_user(_user, current_app.private_userdb)) + + def _verify_foreign_identity(self, user: User) -> VerifyUserResult: + proofing_user = ProofingUser.from_user(user, current_app.private_userdb) + + existing_identity = user.identities.freja + locked_identity = user.locked_identity.freja + + date_of_birth = self.session_info.date_of_birth + new_identity = FrejaIdentity( + personal_identity_number=self.session_info.personal_identity_number, + country_code=self.session_info.document.country, + created_by=current_app.conf.app_name, + date_of_birth=datetime(year=date_of_birth.year, month=date_of_birth.month, day=date_of_birth.day), + is_verified=True, + proofing_method=IdentityProofingMethod.FREJA_EID, + proofing_version=current_app.conf.freja_eid_proofing_version, + user_id=self.session_info.user_id, + registration_level=self.session_info.registration_level, + verified_by=current_app.conf.app_name, + ) + + # check if the just verified identity matches the locked identity + if locked_identity is not None and locked_identity.user_id != new_identity.user_id: + if not self._can_replace_identity(proofing_user=proofing_user): + # asserted identity did not match the locked identity + return VerifyUserResult(error=CommonMsg.locked_identity_not_matching) + # replace the locked identity as the users asserted prid has changed, + # and we are sure enough that it is the same person + proofing_user.locked_identity.replace(element=new_identity) + + # the existing identity is not verified, just remove it + if existing_identity is not None: + proofing_user.identities.remove(key=ElementKey(IdentityType.FREJA)) + + # everything seems to check out, add the new identity to the user + proofing_user.identities.add(element=new_identity) + + # Create a proofing log + proofing_log_entry = self.identity_proofing_element(user=proofing_user) + if proofing_log_entry.error: + return VerifyUserResult(error=proofing_log_entry.error) + assert isinstance(proofing_log_entry.data, FrejaEIDForeignProofing) # please type checking + + # update the users names from the verified identity + proofing_user = set_user_names_from_foreign_id(proofing_user, proofing_log_entry.data) + + # Verify Freja identity for user + if not current_app.proofing_log.save(proofing_log_entry.data): + current_app.logger.error("Failed to save Svipe identity proofing log for user") + return VerifyUserResult(error=CommonMsg.temp_problem) + try: + # Save user to private db + current_app.private_userdb.save(proofing_user) + # Ask am to sync user to central db + current_app.logger.info("Request sync for user") + result = current_app.am_relay.request_user_sync(proofing_user) + current_app.logger.info(f"Sync result for user: {result}") + except AmTaskFailed: + current_app.logger.exception("Verifying Svipe identity for user failed") + return VerifyUserResult(error=CommonMsg.temp_problem) + + current_app.stats.count(name="foreign_identity_verified") + # load the user from central db before returning + _user = current_app.central_userdb.get_user_by_eppn(proofing_user.eppn) + return VerifyUserResult(user=_user) + + def _can_replace_identity(self, proofing_user: ProofingUser) -> bool: + locked_identity = proofing_user.locked_identity.freja + if locked_identity is None: + return True + # Freja relyingPartyUserId should stay the same, but it is not impossible that it has changed + # try to verify that it is the same person with a new Freja relyingPartyUserId + date_of_birth_matches = locked_identity.date_of_birth.date() == self.session_info.date_of_birth + given_name_matches = proofing_user.given_name == self.session_info.given_name + surname_matches = proofing_user.surname == self.session_info.family_name + if date_of_birth_matches and given_name_matches and surname_matches: + return True + return False + + def identity_proofing_element(self, user: User) -> ProofingElementResult: + if self.backdoor: + # TODO: implement backdoor support? + pass + + if self.is_swedish_document(): + return self._nin_identity_proofing_element(user) + return self._foreign_identity_proofing_element(user) + + def _nin_identity_proofing_element(self, user: User) -> ProofingElementResult: + _nin = self.session_info.personal_identity_number + if not _nin: + return ProofingElementResult(error=CommonMsg.nin_invalid) + + data = FrejaEIDNINProofing( + created_by=current_app.conf.app_name, + eppn=user.eppn, + nin=_nin, + given_name=self.session_info.given_name, + surname=self.session_info.family_name, + user_id=self.session_info.user_id, + transaction_id=self.session_info.transaction_id, + document_type=self.session_info.document.type.value, # according to Freja naming convention + document_number=self.session_info.document.serial_number, + proofing_version=current_app.conf.freja_eid_proofing_version, + ) + return ProofingElementResult(data=data) + + def _foreign_identity_proofing_element(self, user: User) -> ProofingElementResult: + # The top-level LogElement class won't allow empty strings (and not None either) + _non_empty_identity_number = self.session_info.personal_identity_number or "freja_no_identity_number_provided" + data = FrejaEIDForeignProofing( + created_by=current_app.conf.app_name, + eppn=user.eppn, + user_id=self.session_info.user_id, + transaction_id=self.session_info.transaction_id, + document_type=self.session_info.document.type.value, # according to Freja naming convention + document_number=self.session_info.document.serial_number, + proofing_version=current_app.conf.freja_eid_proofing_version, + given_name=self.session_info.given_name, + surname=self.session_info.family_name, + date_of_birth=self.session_info.date_of_birth.isoformat(), + country_code=self.session_info.country, + administrative_number=_non_empty_identity_number, + issuing_country=self.session_info.document.country, + ) + return ProofingElementResult(data=data) + + def match_identity(self, user: User, proofing_method: ProofingMethod) -> MatchResult: + raise NotImplementedError("No support for mfa") + + def credential_proofing_element(self, user: User, credential: Credential) -> ProofingElementResult: + raise NotImplementedError("No support for credential proofing") + + def mark_credential_as_verified(self, credential: Credential, loa: Optional[str]) -> VerifyCredentialResult: + raise NotImplementedError("No support for credential proofing") + + +def get_proofing_functions( + session_info: FrejaEIDDocumentUserInfo, + app_name: str, + config: ProofingConfigMixin, + backdoor: bool, +) -> ProofingFunctions[FrejaEIDDocumentUserInfo]: + return FrejaEIDProofingFunctions(session_info=session_info, app_name=app_name, config=config, backdoor=backdoor) diff --git a/src/eduid/webapp/freja_eid/run.py b/src/eduid/webapp/freja_eid/run.py new file mode 100644 index 000000000..b9511ef5e --- /dev/null +++ b/src/eduid/webapp/freja_eid/run.py @@ -0,0 +1,8 @@ +from eduid.webapp.freja_eid.app import freja_eid_init_app + +app = freja_eid_init_app() + + +if __name__ == "__main__": + app.logger.info(f"Starting {app}...") + app.run() diff --git a/src/eduid/webapp/freja_eid/schemas.py b/src/eduid/webapp/freja_eid/schemas.py new file mode 100644 index 000000000..d2932aa84 --- /dev/null +++ b/src/eduid/webapp/freja_eid/schemas.py @@ -0,0 +1,37 @@ +from marshmallow import fields + +from eduid.webapp.common.api.schemas.base import EduidSchema, FluxStandardAction +from eduid.webapp.common.api.schemas.csrf import CSRFRequestMixin, CSRFResponseMixin + +__author__ = "lundberg" + + +class FrejaEIDStatusRequestSchema(EduidSchema, CSRFRequestMixin): + authn_id = fields.String(required=False) + + +class FrejaEIDStatusResponseSchema(EduidSchema, CSRFResponseMixin): + class StatusResponsePayload(EduidSchema, CSRFResponseMixin): + authn_id = fields.String(required=True) + frontend_action = fields.String(required=True) + frontend_state = fields.String(required=False) + method = fields.String(required=True) + error = fields.Boolean(required=False) + status = fields.String(required=False) + + payload = fields.Nested(StatusResponsePayload) + + +class FrejaEIDCommonRequestSchema(EduidSchema, CSRFRequestMixin): + """A verify request for either an identity or a credential proofing.""" + + method = fields.String(required=True) + frontend_action = fields.String(required=True) + frontend_state = fields.String(required=False) + + +class FrejaEIDCommonResponseSchema(FluxStandardAction): + class VerifyResponsePayload(EduidSchema, CSRFResponseMixin): + location = fields.String(required=False) + + payload = fields.Nested(VerifyResponsePayload) diff --git a/src/eduid/webapp/freja_eid/settings/__init__.py b/src/eduid/webapp/freja_eid/settings/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/eduid/webapp/freja_eid/settings/common.py b/src/eduid/webapp/freja_eid/settings/common.py new file mode 100644 index 000000000..5846c1996 --- /dev/null +++ b/src/eduid/webapp/freja_eid/settings/common.py @@ -0,0 +1,46 @@ +from typing import Union + +from pydantic import AnyUrl, BaseModel, Field + +from eduid.common.clients.oidc_client.base import AuthlibClientConfig +from eduid.common.config.base import ( + AmConfigMixin, + EduIDBaseAppConfig, + ErrorsConfigMixin, + FrontendActionMixin, + MagicCookieMixin, + ProofingConfigMixin, +) + +__author__ = "lundberg" + + +class FrejaEIDClientConfig(AuthlibClientConfig): + acr_values: list[str] = Field(default=[]) + scopes: list[str] = Field( + default=[ + "openid", + "profile", + "https://frejaeid.com/oidc/scopes/personalIdentityNumber", + "https://frejaeid.com/oidc/scopes/document", + "https://frejaeid.com/oidc/scopes/registrationLevel", + "https://frejaeid.com/oidc/scopes/relyingPartyUserId", + ] + ) + claims_request: dict[str, Union[None, dict[str, bool]]] = Field(default={}) + + +class FrejaEIDConfig( + EduIDBaseAppConfig, + AmConfigMixin, + ProofingConfigMixin, + ErrorsConfigMixin, + MagicCookieMixin, + FrontendActionMixin, +): + """ + Configuration for the svipe_id app + """ + + app_name: str = "freja_eid" + freja_eid_client: FrejaEIDClientConfig diff --git a/src/eduid/webapp/freja_eid/tests/__init__.py b/src/eduid/webapp/freja_eid/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/eduid/webapp/freja_eid/tests/test_app.py b/src/eduid/webapp/freja_eid/tests/test_app.py new file mode 100644 index 000000000..7043cc8d7 --- /dev/null +++ b/src/eduid/webapp/freja_eid/tests/test_app.py @@ -0,0 +1,661 @@ +import json +from datetime import date, datetime, timedelta +from typing import Any, Optional +from unittest.mock import MagicMock, patch +from urllib.parse import parse_qs, urlparse + +from flask import url_for +from iso3166 import Country, countries + +from eduid.common.config.base import FrontendAction +from eduid.common.misc.timeutil import utc_now +from eduid.userdb.identity import FrejaIdentity, FrejaRegistrationLevel, IdentityProofingMethod +from eduid.webapp.common.api.messages import CommonMsg +from eduid.webapp.common.proofing.messages import ProofingMsg +from eduid.webapp.common.proofing.testing import ProofingTests +from eduid.webapp.freja_eid.app import FrejaEIDApp, freja_eid_init_app +from eduid.webapp.freja_eid.helpers import FrejaDocument, FrejaDocumentType, FrejaEIDDocumentUserInfo, FrejaEIDMsg + +__author__ = "lundberg" + + +class FrejaEIDTests(ProofingTests[FrejaEIDApp]): + """Base TestCase for those tests that need a full environment setup""" + + def setUp(self, *args, **kwargs): + super().setUp(*args, **kwargs, users=["hubba-bubba", "hubba-baar"]) + + self.unverified_test_user = self.app.central_userdb.get_user_by_eppn("hubba-baar") + self._user_setup() + + self.default_frontend_data = { + "method": "freja_eid", + "frontend_action": "verifyIdentity", + "frontend_state": "test_state", + } + + self.oidc_provider_config = { + "response_types_supported": ["code"], + "request_parameter_supported": True, + "request_uri_parameter_supported": False, + "userinfo_encryption_alg_values_supported": ["none"], + "claims_parameter_supported": False, + "grant_types_supported": ["authorization_code"], + "scopes_supported": [ + "openid", + "email", + "profile", + "https://frejaeid.com/oidc/scopes/age", + "https://frejaeid.com/oidc/scopes/personalIdentityNumber", + "https://frejaeid.com/oidc/scopes/organisationId", + "phone", + "https://frejaeid.com/oidc/scopes/allPhoneNumbers", + "https://frejaeid.com/oidc/scopes/covidCertificate", + "https://frejaeid.com/oidc/scopes/document", + "https://frejaeid.com/oidc/scopes/registrationLevel", + "https://frejaeid.com/oidc/scopes/allEmailAddresses", + "https://frejaeid.com/oidc/scopes/relyingPartyUserId", + "https://frejaeid.com/oidc/scopes/integratorSpecificUserId", + "https://frejaeid.com/oidc/scopes/customIdentifier", + "https://frejaeid.com/oidc/scopes/addresses", + "address", + ], + "issuer": "https://example.com/op/oidc/", + "authorization_endpoint": "https://example.com/op/oidc/authorize", + "userinfo_endpoint": "https://example.com/op/oidc/userinfo", + "token_endpoint_auth_signing_alg_values_supported": ["RS256"], + "userinfo_signing_alg_values_supported": ["none"], + "claims_supported": [ + "email", + "email_verified", + "name", + "given_name", + "family_name", + "https://frejaeid.com/oidc/claims/age", + "https://frejaeid.com/oidc/claims/personalIdentityNumber", + "https://frejaeid.com/oidc/claims/country", + "https://frejaeid.com/oidc/claims/organisationIdIdentifier", + "https://frejaeid.com/oidc/claims/organisationIdAdditionalAttributes", + "phone_number", + "phone_number_verified", + "https://frejaeid.com/oidc/claims/allPhoneNumbers", + "https://frejaeid.com/oidc/claims/covidCertificate", + "https://frejaeid.com/oidc/claims/document", + "https://frejaeid.com/oidc/claims/registrationLevel", + "https://frejaeid.com/oidc/claims/allEmailAddresses", + "https://frejaeid.com/oidc/claims/relyingPartyUserId", + "https://frejaeid.com/oidc/claims/integratorSpecificUserId", + "https://frejaeid.com/oidc/claims/customIdentifier", + "https://frejaeid.com/oidc/claims/addresses", + "address", + ], + "require_request_uri_registration": True, + "code_challenge_methods_supported": ["plain", "S256"], + "jwks_uri": "https://example.com/op/oidc/jwk", + "subject_types_supported": ["public"], + "id_token_signing_alg_values_supported": ["RS256"], + "claim_types_supported": ["normal"], + "token_endpoint_auth_methods_supported": ["client_secret_post", "client_secret_basic"], + "request_object_signing_alg_values_supported": ["RS256", "none"], + "request_object_encryption_alg_values_supported": ["RSA1_5", "RSA-OAEP-256"], + "token_endpoint": "https://example.com/op/oidc/token", + } + + def load_app(self, config: dict[str, Any]) -> FrejaEIDApp: + """ + Called from the parent class, so we can provide the appropriate flask + app for this test case. + """ + return freja_eid_init_app("testing", config) + + def update_config(self, config: dict[str, Any]): + config.update( + { + "freja_eid_client": { + "client_id": "test_client_id", + "client_secret": "test_client_secret", + "issuer": "https://example.com/op/oidc", + }, + "frontend_action_authn_parameters": { + FrontendAction.VERIFY_IDENTITY.value: { + "force_authn": True, + "finish_url": "https://dashboard.example.com/profile/ext-return/{app_name}/{authn_id}", + }, + }, + } + ) + return config + + def _user_setup(self): + # remove any freja eid identity that already exists, we want to handle those ourselves + for eppn in [self.test_user.eppn, self.unverified_test_user.eppn]: + user = self.app.central_userdb.get_user_by_eppn(eppn) + if user.identities.freja: + user.identities.remove(user.identities.freja.key) + self.app.central_userdb.save(user) + + @staticmethod + def get_mock_userinfo( + issuing_country: Country, + personal_identity_number: Optional[str] = "123456789", + registration_level: FrejaRegistrationLevel = FrejaRegistrationLevel.EXTENDED, + birthdate: date = date(year=1901, month=2, day=3), + freja_user_id: str = "unique_freja_eid", + transaction_id: str = "unique_transaction_id", + given_name: str = "Test", + family_name: str = "Testsson", + now: datetime = utc_now(), + userinfo_expires: Optional[datetime] = None, + document_expires: Optional[datetime] = None, + ) -> FrejaEIDDocumentUserInfo: + if userinfo_expires is None: + userinfo_expires = now + timedelta(minutes=5) + if document_expires is None: + document_expires = now + timedelta(days=1095) # 3 years + + return FrejaEIDDocumentUserInfo( + aud="test", + exp=int(userinfo_expires.timestamp()), + iat=int(now.timestamp()), + iss="test", + sub=freja_user_id, + date_of_birth=birthdate, + family_name=family_name, + given_name=given_name, + name=f"{given_name} {family_name}", + country=issuing_country.alpha2, + document=FrejaDocument( + type=FrejaDocumentType.PASSPORT, + country=issuing_country.alpha2, + serial_number="1234567890", + expiration_date=document_expires.date(), + ), + personal_identity_number=personal_identity_number, + user_id=freja_user_id, + registration_level=registration_level, + transaction_id=transaction_id, + ) + + @staticmethod + def _get_state_and_nonce(auth_url: str) -> tuple[str, str]: + auth_url_query = urlparse(auth_url).query + return parse_qs(auth_url_query)["state"][0], parse_qs(auth_url_query)["nonce"][0] + + @patch("authlib.integrations.requests_client.oauth2_session.OAuth2Session.request") + @patch("authlib.integrations.base_client.sync_openid.OpenIDMixin.parse_id_token") + @patch("authlib.integrations.base_client.sync_openid.OpenIDMixin.userinfo") + @patch("authlib.integrations.base_client.sync_app.OAuth2Mixin.fetch_access_token") + @patch("authlib.integrations.base_client.sync_app.OAuth2Mixin.load_server_metadata") + def mock_authorization_callback( + self, + mock_metadata: MagicMock, + mock_fetch_access_token: MagicMock, + mock_userinfo: MagicMock, + mock_parse_id_token: MagicMock, + mock_end_session: MagicMock, + state: str, + nonce: str, + userinfo: FrejaEIDDocumentUserInfo, + ): + with self.app.test_request_context(): + endpoint = url_for("freja_eid.authn_callback") + + mock_metadata.return_value = self.oidc_provider_config + mock_end_session.return_value = True + + id_token = json.dumps( + { + "nonce": nonce, + "sub": "sub", + "iss": "iss", + "aud": ["aud"], + "exp": userinfo.exp, + "iat": userinfo.iat, + "auth_time": userinfo.iat, + "acr": "acr", + "amr": ["amr"], + "azp": "azp", + } + ) + mock_fetch_access_token.return_value = { + "access_token": "access_token", + "token_type": "token_type", + "expires_in": timedelta(minutes=5).total_seconds(), + "expires_at": userinfo.exp, + "refresh_token": "refresh_token", + "id_token": id_token, + } + + mock_parse_id_token.return_value = userinfo.model_dump() + mock_userinfo.return_value = userinfo.model_dump() + return self.browser.get(f"{endpoint}?id_token=id_token&state={state}&code=mock_code") + + @patch("authlib.integrations.base_client.sync_app.OAuth2Mixin.load_server_metadata") + def _start_auth(self, mock_metadata: MagicMock, endpoint: str, data: dict[str, Any], eppn: str): + mock_metadata.return_value = self.oidc_provider_config + + with self.session_cookie(self.browser, eppn) as client: + with client.session_transaction() as sess: + csrf_token = sess.get_csrf_token() + _data = { + "csrf_token": csrf_token, + } + _data.update(data) + return client.post(endpoint, json=_data) + + def test_app_starts(self): + assert self.app.conf.app_name == "testing" + + def test_authenticate(self): + response = self.browser.get("/") + self.assertEqual(response.status_code, 302) # Redirect to token service + with self.session_cookie(self.browser, self.test_user.eppn) as browser: + response = browser.get("/") + self._check_success_response(response, type_="GET_FREJA_EID_SUCCESS") + + def test_verify_identity_request(self): + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=self.test_user.eppn) + assert response.status_code == 200 + self._check_success_response(response, type_="POST_FREJA_EID_VERIFY_IDENTITY_SUCCESS") + assert self.get_response_payload(response)["location"].startswith("https://example.com/op/oidc/authorize") + query: dict[str, list[str]] = parse_qs(urlparse(self.get_response_payload(response)["location"]).query) # type: ignore + assert query["response_type"] == ["code"] + assert query["client_id"] == ["test_client_id"] + assert query["redirect_uri"] == ["http://test.localhost/authn-callback"] + assert query["scope"] == [ + " ".join(self.app.conf.freja_eid_client.scopes) + ], f"{query['scope']} != {[' '.join(self.app.conf.freja_eid_client.scopes)]}" + assert query["code_challenge_method"] == ["S256"] + + @patch("eduid.common.rpc.msg_relay.MsgRelay.get_all_navet_data") + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_nin_identity(self, mock_request_user_sync: MagicMock, mock_get_all_navet_data: MagicMock): + mock_get_all_navet_data.return_value = self._get_all_navet_data() + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.unverified_test_user.eppn + country = countries.get("Sweden") + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + + userinfo = self.get_mock_userinfo(issuing_country=country, registration_level=FrejaRegistrationLevel.PLUS) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), + frontend_state=self.default_frontend_data["frontend_state"], + method=self.default_frontend_data["method"], + expect_msg=FrejaEIDMsg.identity_verify_success, + ) + + user = self.app.central_userdb.get_user_by_eppn(eppn) + self._verify_user_parameters( + eppn, + identity_verified=True, + num_proofings=1, + num_mfa_tokens=0, + locked_identity=user.identities.nin, + proofing_method=IdentityProofingMethod.FREJA_EID, + proofing_version=self.app.conf.freja_eid_proofing_version, + ) + + # check names + assert user.given_name == userinfo.given_name + assert user.surname == userinfo.family_name + # check proofing log + doc = self.app.proofing_log._get_documents_by_attr(attr="eduPersonPrincipalName", value=eppn)[0] + assert doc["given_name"] == userinfo.given_name + assert doc["surname"] == userinfo.family_name + + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_foreign_identity(self, mock_request_user_sync: MagicMock): + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.unverified_test_user.eppn + country = countries.get("Denmark") + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + userinfo = self.get_mock_userinfo(issuing_country=country) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), + frontend_state=self.default_frontend_data["frontend_state"], + method=self.default_frontend_data["method"], + expect_msg=FrejaEIDMsg.identity_verify_success, + ) + + user = self.app.central_userdb.get_user_by_eppn(eppn) + self._verify_user_parameters( + eppn, + identity_verified=True, + num_proofings=1, + num_mfa_tokens=0, + locked_identity=user.identities.freja, + proofing_method=IdentityProofingMethod.FREJA_EID, + proofing_version=self.app.conf.freja_eid_proofing_version, + ) + + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_foreign_identity_no_identity_number(self, mock_request_user_sync: MagicMock): + """Not all countries have something like a Swedish NIN, so personal_identity_number may be None""" + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.unverified_test_user.eppn + country = countries.get("Denmark") + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + userinfo = self.get_mock_userinfo(issuing_country=country, personal_identity_number=None) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), + frontend_state=self.default_frontend_data["frontend_state"], + method=self.default_frontend_data["method"], + expect_msg=FrejaEIDMsg.identity_verify_success, + ) + + user = self.app.central_userdb.get_user_by_eppn(eppn) + + assert user.identities.freja is not None + assert user.identities.freja.personal_identity_number is None + + self._verify_user_parameters( + eppn, + identity_verified=True, + num_proofings=1, + num_mfa_tokens=0, + locked_identity=user.identities.freja, + proofing_method=IdentityProofingMethod.FREJA_EID, + proofing_version=self.app.conf.freja_eid_proofing_version, + ) + + @patch("eduid.common.rpc.msg_relay.MsgRelay.get_all_navet_data") + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_nin_identity_already_verified( + self, mock_request_user_sync: MagicMock, mock_get_all_navet_data: MagicMock + ): + mock_get_all_navet_data.return_value = self._get_all_navet_data() + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.test_user.eppn + country = countries.get("Sweden") + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + userinfo = self.get_mock_userinfo(issuing_country=country) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), + frontend_state=self.default_frontend_data["frontend_state"], + method=self.default_frontend_data["method"], + expect_error=True, + expect_msg=ProofingMsg.identity_already_verified, + ) + self._verify_user_parameters(eppn, identity_verified=True, num_proofings=0, num_mfa_tokens=0) + + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_foreign_identity_already_verified(self, mock_request_user_sync: MagicMock): + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.test_user.eppn + country = countries.get("Denmark") + + # add a verified freja eid identity + user = self.app.central_userdb.get_user_by_eppn(eppn) + userinfo = self.get_mock_userinfo(issuing_country=country) + user.identities.add( + FrejaIdentity( + personal_identity_number=userinfo.personal_identity_number, + country_code=country.alpha2, + date_of_birth=datetime.combine(userinfo.date_of_birth, datetime.min.time()), + is_verified=True, + user_id=userinfo.user_id, + registration_level=userinfo.registration_level, + ) + ) + self.request_user_sync(user) + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + userinfo = self.get_mock_userinfo(issuing_country=country) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), + frontend_state=self.default_frontend_data["frontend_state"], + method=self.default_frontend_data["method"], + expect_error=True, + expect_msg=ProofingMsg.identity_already_verified, + ) + self._verify_user_parameters(eppn, identity_verified=True, num_proofings=0, num_mfa_tokens=0) + + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_foreign_identity_replace_locked_identity(self, mock_request_user_sync: MagicMock): + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.test_user.eppn + country = countries.get("Denmark") + + # add a locked freja eid identity that will match the new identity + user = self.app.central_userdb.get_user_by_eppn(eppn) + userinfo = self.get_mock_userinfo(issuing_country=country) + user.locked_identity.add( + FrejaIdentity( + personal_identity_number=userinfo.personal_identity_number, + country_code="DK", + date_of_birth=datetime.combine(userinfo.date_of_birth, datetime.min.time()), + is_verified=True, + user_id="another_freja_eid", + registration_level=userinfo.registration_level, + ) + ) + user.given_name = userinfo.given_name + user.surname = userinfo.family_name + self.app.central_userdb.save(user) + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), + frontend_state=self.default_frontend_data["frontend_state"], + method=self.default_frontend_data["method"], + expect_msg=FrejaEIDMsg.identity_verify_success, + ) + new_locked_identity = FrejaIdentity( + personal_identity_number=userinfo.personal_identity_number, + country_code="DK", + date_of_birth=datetime.combine(userinfo.date_of_birth, datetime.min.time()), + user_id=userinfo.user_id, + registration_level=userinfo.registration_level, + ) + self._verify_user_parameters( + eppn, identity_verified=True, num_proofings=1, num_mfa_tokens=0, locked_identity=new_locked_identity + ) + + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_foreign_identity_replace_locked_identity_fail(self, mock_request_user_sync: MagicMock): + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.unverified_test_user.eppn + country = countries.get("Denmark") + + userinfo = self.get_mock_userinfo(issuing_country=country) + + # add a locked freja eid identity that will NOT match the new identity + user = self.app.central_userdb.get_user_by_eppn(eppn) + user.locked_identity.add( + FrejaIdentity( + personal_identity_number=userinfo.personal_identity_number, + country_code=userinfo.document.country, + date_of_birth=datetime.today(), # not matching the new identity + is_verified=True, + user_id="another_freja_eid", + registration_level=userinfo.registration_level, + ) + ) + self.app.central_userdb.save(user) + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + userinfo = self.get_mock_userinfo(issuing_country=country) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), + frontend_state=self.default_frontend_data["frontend_state"], + method=self.default_frontend_data["method"], + expect_error=True, + expect_msg=CommonMsg.locked_identity_not_matching, + ) + self._verify_user_parameters( + eppn, identity_verified=False, num_proofings=0, num_mfa_tokens=0, locked_identity=user.locked_identity.freja + ) + + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_foreign_identity_replace_locked_identity_fail_personal_id_number( + self, mock_request_user_sync: MagicMock + ): + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.unverified_test_user.eppn + country = countries.get("Denmark") + personal_identity_number = "1234567890" + other_admin_number = "0987654321" + + userinfo = self.get_mock_userinfo(issuing_country=country, personal_identity_number=personal_identity_number) + + # add a locked freja eid identity that will NOT match the new identity + user = self.app.central_userdb.get_user_by_eppn(eppn) + user.locked_identity.add( + FrejaIdentity( + personal_identity_number=other_admin_number, # not matching the new identity + country_code=userinfo.document.country, + date_of_birth=datetime.combine(userinfo.date_of_birth, datetime.min.time()), + is_verified=True, + user_id="another_freja_eid", + registration_level=userinfo.registration_level, + ) + ) + self.app.central_userdb.save(user) + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + userinfo = self.get_mock_userinfo(issuing_country=country) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), + frontend_state=self.default_frontend_data["frontend_state"], + method=self.default_frontend_data["method"], + expect_error=True, + expect_msg=CommonMsg.locked_identity_not_matching, + ) + self._verify_user_parameters( + eppn, identity_verified=False, num_proofings=0, num_mfa_tokens=0, locked_identity=user.locked_identity.freja + ) + + @patch("eduid.common.rpc.msg_relay.MsgRelay.get_all_navet_data") + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_foreign_identity_already_verified_nin( + self, mock_request_user_sync: MagicMock, mock_get_all_navet_data: MagicMock + ): + mock_get_all_navet_data.return_value = self._get_all_navet_data() + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.test_user.eppn + country = countries.get("Denmark") + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + userinfo = self.get_mock_userinfo(issuing_country=country) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), + frontend_state=self.default_frontend_data["frontend_state"], + method=self.default_frontend_data["method"], + expect_msg=FrejaEIDMsg.identity_verify_success, + ) + user = self.app.central_userdb.get_user_by_eppn(eppn) + self._verify_user_parameters( + eppn, identity_verified=True, num_proofings=1, num_mfa_tokens=0, locked_identity=user.identities.freja + ) + self._verify_user_parameters( + eppn, identity_verified=True, num_proofings=1, num_mfa_tokens=0, locked_identity=user.identities.nin + ) + + @patch("eduid.common.rpc.msg_relay.MsgRelay.get_all_navet_data") + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_verify_identity_expired_document( + self, mock_request_user_sync: MagicMock, mock_get_all_navet_data: MagicMock + ): + mock_get_all_navet_data.return_value = self._get_all_navet_data() + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.unverified_test_user.eppn + country = countries.get("Sweden") + + with self.app.test_request_context(): + endpoint = url_for("freja_eid.verify_identity") + + start_auth_response = self._start_auth(endpoint=endpoint, data=self.default_frontend_data, eppn=eppn) + state, nonce = self._get_state_and_nonce(self.get_response_payload(start_auth_response)["location"]) + yesterday = utc_now() - timedelta(days=1) + userinfo = self.get_mock_userinfo(issuing_country=country, document_expires=yesterday) + response = self.mock_authorization_callback(state=state, nonce=nonce, userinfo=userinfo) + assert response.status_code == 302 + self._verify_status( + finish_url=response.headers["Location"], + frontend_action=FrontendAction(self.default_frontend_data["frontend_action"]), + frontend_state=self.default_frontend_data["frontend_state"], + method=self.default_frontend_data["method"], + expect_error=True, + expect_msg=ProofingMsg.session_info_not_valid, + ) + self._verify_user_parameters(eppn, identity_verified=False, num_proofings=0, num_mfa_tokens=0) diff --git a/src/eduid/webapp/freja_eid/views.py b/src/eduid/webapp/freja_eid/views.py new file mode 100644 index 000000000..c2659955c --- /dev/null +++ b/src/eduid/webapp/freja_eid/views.py @@ -0,0 +1,214 @@ +import json +from dataclasses import dataclass +from typing import Optional +from urllib.parse import parse_qs, urlparse + +from authlib.integrations.base_client import OAuthError +from flask import Blueprint, make_response, redirect, request, url_for +from werkzeug import Response as WerkzeugResponse + +from eduid.common.config.base import FrontendAction +from eduid.userdb import User +from eduid.webapp.common.api.decorators import MarshalWith, UnmarshalWith, require_user +from eduid.webapp.common.api.errors import EduidErrorsContext, goto_errors_response +from eduid.webapp.common.api.helpers import check_magic_cookie +from eduid.webapp.common.api.messages import AuthnStatusMsg, FluxData, TranslatableMsg, error_response, success_response +from eduid.webapp.common.api.schemas.authn_status import StatusRequestSchema, StatusResponseSchema +from eduid.webapp.common.api.schemas.csrf import EmptyResponse +from eduid.webapp.common.authn.acs_registry import ACSArgs, get_action +from eduid.webapp.common.proofing.methods import get_proofing_method +from eduid.webapp.common.session import session +from eduid.webapp.common.session.namespaces import OIDCState, RP_AuthnRequest +from eduid.webapp.freja_eid.app import current_freja_eid_app as current_app +from eduid.webapp.freja_eid.callback_enums import FrejaEIDAction +from eduid.webapp.freja_eid.helpers import FrejaEIDMsg +from eduid.webapp.freja_eid.schemas import FrejaEIDCommonRequestSchema, FrejaEIDCommonResponseSchema + +__author__ = "lundberg" + + +freja_eid_views = Blueprint("freja_eid", __name__, url_prefix="") + + +@freja_eid_views.route("/", methods=["GET"]) +@MarshalWith(EmptyResponse) +@require_user +def index(user: User) -> FluxData: + return success_response(payload=None, message=None) + + +@freja_eid_views.route("/get-status", methods=["POST"]) +@UnmarshalWith(StatusRequestSchema) +@MarshalWith(StatusResponseSchema) +def get_status(authn_id: OIDCState) -> FluxData: + authn = session.freja_eid.rp.authns.get(authn_id) + if not authn: + return error_response(message=AuthnStatusMsg.not_found) + + payload = { + "authn_id": str(authn_id), + "frontend_action": authn.frontend_action.value, + "frontend_state": authn.frontend_state, + "method": authn.method, + "error": bool(authn.error), + } + if authn.status is not None: + payload["status"] = authn.status + + return success_response(payload=payload) + + +@freja_eid_views.route("/verify-identity", methods=["POST"]) +@UnmarshalWith(FrejaEIDCommonRequestSchema) +@MarshalWith(FrejaEIDCommonResponseSchema) +@require_user +def verify_identity(user: User, method: str, frontend_action: str, frontend_state: Optional[str] = None) -> FluxData: + res = _authn(FrejaEIDAction.verify_identity, method, frontend_action, frontend_state) + if res.error: + current_app.logger.error(f"Failed to start verify identity: {res.error}") + return error_response(message=res.error) + return success_response(payload={"location": res.url}) + + +@dataclass +class AuthnResult: + authn_req: Optional[RP_AuthnRequest] = None + authn_id: Optional[OIDCState] = None + error: Optional[TranslatableMsg] = None + url: Optional[str] = None + + +def _authn( + action: FrejaEIDAction, + method: str, + frontend_action: str, + frontend_state: Optional[str] = None, +) -> AuthnResult: + current_app.logger.debug(f"Requested method: {method}, frontend action: {frontend_action}") + + try: + _frontend_action = FrontendAction(frontend_action) + authn_params = current_app.conf.frontend_action_authn_parameters[_frontend_action] + except (ValueError, KeyError): + current_app.logger.exception(f"Frontend action {frontend_action} not supported") + return AuthnResult(error=FrejaEIDMsg.frontend_action_not_supported) + + try: + auth_redirect = current_app.oidc_client.freja_eid.authorize_redirect( + redirect_uri=url_for("freja_eid.authn_callback", _external=True), + ) + except OAuthError: + current_app.logger.exception("Failed to create authorization request") + return AuthnResult(error=FrejaEIDMsg.authn_request_failed) + + auth_url = auth_redirect.headers["Location"] + auth_url_query = urlparse(auth_url).query + try: + # Ignore PyCharm warning "Expected type 'bytes' ..." for "state" lookup + state = parse_qs(auth_url_query)["state"][0] + except KeyError: + current_app.logger.error(f'Failed to parse "state" from authn request: {auth_url_query}') + return AuthnResult(error=FrejaEIDMsg.authn_request_failed) + + proofing_method = get_proofing_method(method, _frontend_action, current_app.conf) + if not proofing_method: + current_app.logger.error(f"Unknown method: {method}") + return AuthnResult(error=FrejaEIDMsg.method_not_available) + + authn_req = RP_AuthnRequest( + authn_id=OIDCState(state), + frontend_action=_frontend_action, + frontend_state=frontend_state, + post_authn_action=action, + method=proofing_method.method, + finish_url=authn_params.finish_url, + ) + session.freja_eid.rp.authns[authn_req.authn_id] = authn_req + current_app.logger.debug(f"Stored RP_AuthnRequest[{authn_req.authn_id}]: {authn_req}") + current_app.logger.debug(f"returning url: {auth_url}") + return AuthnResult(authn_id=authn_req.authn_id, url=auth_url, authn_req=authn_req) + + +@freja_eid_views.route("/authn-callback", methods=["GET"]) +@require_user +def authn_callback(user) -> WerkzeugResponse: + """ + This is the callback endpoint for the Svipe ID OIDC flow. + """ + current_app.logger.debug("authn_callback called") + current_app.logger.debug(f"request.args: {request.args}") + authn_req = None + oidc_state: Optional[OIDCState] = None + if "state" in request.args: + oidc_state = OIDCState(request.args["state"]) + if oidc_state is not None: + authn_req = session.freja_eid.rp.authns.get(oidc_state) + + if not oidc_state or not authn_req: + # Perhaps an authn response received out of order - abort without destroying state + # (User makes two requests, A and B. Response B arrives, user is happy and proceeds with their work. + # Then response A arrives late. Just silently abort, no need to mess up the users' session.) + current_app.logger.info( + f"Response {oidc_state} does not match one in session, redirecting user to eduID Errors page" + ) + if not current_app.conf.errors_url_template: + return make_response("Unknown authn response", 400) + return goto_errors_response( + errors_url=current_app.conf.errors_url_template, + ctx=EduidErrorsContext.OIDC_RESPONSE_UNSOLICITED, + rp=url_for("freja_eid.authn_callback", _external=True), + ) + current_app.stats.count(name="authn_response") + + proofing_method = get_proofing_method(authn_req.method, authn_req.frontend_action, current_app.conf) + if not proofing_method: + # We _really_ shouldn't end up here because this same thing would have been done in the + # starting views above. + current_app.logger.warning(f"No proofing_method for method {authn_req.method}") + if not current_app.conf.errors_url_template: + return make_response("Unknown authn method", 400) + return goto_errors_response( + errors_url=current_app.conf.errors_url_template, + ctx=EduidErrorsContext.OIDC_RESPONSE_FAIL, + rp=url_for("freja_eid.authn_callback", _external=True), + ) + + formatted_finish_url = authn_req.formatted_finish_url(app_name=current_app.conf.app_name) + assert formatted_finish_url # please type checking + + try: + token_response = current_app.oidc_client.freja_eid.authorize_access_token() + current_app.logger.debug(f"Got token response: {token_response}") + except (OAuthError, KeyError): + # catch any exception from the oidc client and also exceptions about missing request arguments + current_app.logger.exception("Failed to get token response from Freja") + current_app.stats.count(name="token_response_failed") + authn_req.error = True + authn_req.status = FrejaEIDMsg.authorization_error.value + return redirect(formatted_finish_url) + + action = get_action(default_action=None, authndata=authn_req) + backdoor = check_magic_cookie(config=current_app.conf) + args = ACSArgs( + session_info=token_response.get("userinfo"), + authn_req=authn_req, + proofing_method=proofing_method, + backdoor=backdoor, + ) + result = action(args=args) + current_app.logger.debug(f"Callback action result: {result}") + + if not result.success: + current_app.logger.info(f"OIDC callback action failed: {result.message}") + current_app.stats.count(name="authn_action_failed") + args.authn_req.error = True + if result.message: + args.authn_req.status = result.message.value + args.authn_req.consumed = True + return redirect(formatted_finish_url) + + current_app.logger.debug(f"OIDC callback action successful (frontend_action {args.authn_req.frontend_action})") + if result.message: + args.authn_req.status = result.message.value + args.authn_req.consumed = True + return redirect(formatted_finish_url) diff --git a/src/eduid/webapp/jsconfig/settings/jsapps.py b/src/eduid/webapp/jsconfig/settings/jsapps.py index 09ac6ab3b..1253f1a58 100644 --- a/src/eduid/webapp/jsconfig/settings/jsapps.py +++ b/src/eduid/webapp/jsconfig/settings/jsapps.py @@ -32,6 +32,7 @@ class JsAppsConfig(PasswordConfigMixin): emails_service_url: HttpUrlStr # error_info_url needs to be a full URL since the backend is on the idp, not on https://eduid.se error_info_url: Optional[HttpUrlStr] = None + freja_eid_service_url: Optional[HttpUrlStr] = None group_mgmt_service_url: HttpUrlStr ladok_service_url: HttpUrlStr letter_proofing_service_url: HttpUrlStr diff --git a/src/eduid/webapp/svipe_id/helpers.py b/src/eduid/webapp/svipe_id/helpers.py index 49cb8174a..ad34e5215 100644 --- a/src/eduid/webapp/svipe_id/helpers.py +++ b/src/eduid/webapp/svipe_id/helpers.py @@ -30,7 +30,7 @@ class SvipeIDMsg(TranslatableMsg): identity_verify_success = "svipe_id.identity_verify_success" # Authorization error at Svipe ID authorization_error = "svipe_id.authorization_fail" - frontend_action_not_supported = "svipe_id.frontend-action-not-supported" + frontend_action_not_supported = "svipe_id.frontend_action_not_supported" class SessionOAuthCache: diff --git a/src/eduid/webapp/svipe_id/settings/common.py b/src/eduid/webapp/svipe_id/settings/common.py index a180ed142..6ee951b2f 100644 --- a/src/eduid/webapp/svipe_id/settings/common.py +++ b/src/eduid/webapp/svipe_id/settings/common.py @@ -2,6 +2,7 @@ from pydantic import AnyUrl, BaseModel, Field +from eduid.common.clients.oidc_client.base import AuthlibClientConfig from eduid.common.config.base import ( AmConfigMixin, EduIDBaseAppConfig, @@ -14,15 +15,6 @@ __author__ = "lundberg" -class AuthlibClientConfig(BaseModel): - client_id: str - client_secret: str - issuer: AnyUrl - code_challenge_method: str = Field(default="S256") - acr_values: list[str] = Field(default_factory=list) - scopes: list[str] = Field(default=["openid"]) - - class SvipeClientConfig(AuthlibClientConfig): acr_values: list[str] = Field(default=["face_present"]) scopes: list[str] = Field(default=["openid"]) diff --git a/src/eduid/workers/am/ams/__init__.py b/src/eduid/workers/am/ams/__init__.py index c6a0d217e..3e5e5608f 100644 --- a/src/eduid/workers/am/ams/__init__.py +++ b/src/eduid/workers/am/ams/__init__.py @@ -22,7 +22,12 @@ OrcidProofingUserDB, PhoneProofingUserDB, ) -from eduid.userdb.proofing.db import BankIDProofingUserDB, LadokProofingUserDB, SvideIDProofingUserDB +from eduid.userdb.proofing.db import ( + BankIDProofingUserDB, + FrejaEIDProofingUserDB, + LadokProofingUserDB, + SvideIDProofingUserDB, +) from eduid.userdb.reset_password import ResetPasswordUserDB from eduid.userdb.security import SecurityUserDB from eduid.userdb.signup import SignupUserDB @@ -314,6 +319,27 @@ def get_user_db(cls, uri: str) -> BankIDProofingUserDB: return BankIDProofingUserDB(uri) +class eduid_freja_eid(AttributeFetcher): + whitelist_set_attrs = [ + "passwords", + "identities", + "givenName", + "chosen_given_name", + "surname", + "legal_name", + ] + whitelist_unset_attrs: list[str] = [ + "identities", + "chosen_given_name", + "nins", # Old format + "displayName", # deprecated + ] + + @classmethod + def get_user_db(cls, uri: str) -> FrejaEIDProofingUserDB: + return FrejaEIDProofingUserDB(uri) + + class eduid_job_runner(AttributeFetcher): whitelist_set_attrs = ["terminated"] # skv cleaner checks status of registered persons