From edf4c0a491e5f3601140981b7d7edce1cbaa6c2a Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Thu, 12 Sep 2024 10:48:56 +0200 Subject: [PATCH 1/3] add endpoint to check authn status --- src/eduid/webapp/security/helpers.py | 4 +++ src/eduid/webapp/security/schemas.py | 12 ++++++++ src/eduid/webapp/security/views/security.py | 32 ++++++++++++++++++++- 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/src/eduid/webapp/security/helpers.py b/src/eduid/webapp/security/helpers.py index 08f0c4db0..874ef8af1 100644 --- a/src/eduid/webapp/security/helpers.py +++ b/src/eduid/webapp/security/helpers.py @@ -77,6 +77,10 @@ class SecurityMsg(TranslatableMsg): not_found = "security.not_found" # wrong identity type requested wrong_identity_type = "security.wrong-identity-type" + # Credential not found in the user's account + credential_not_found = "security.credential_not_found" + # frontend action is not implemented + frontend_action_not_supported = "security.frontend_action_not_supported" @dataclass diff --git a/src/eduid/webapp/security/schemas.py b/src/eduid/webapp/security/schemas.py index 663373bfa..2a4637837 100644 --- a/src/eduid/webapp/security/schemas.py +++ b/src/eduid/webapp/security/schemas.py @@ -138,3 +138,15 @@ class UserUpdatePayload(EduidSchema, CSRFRequestMixin): class SecurityKeysResponseSchema(FluxStandardAction): next_update = fields.DateTime(required=True) entries = fields.List(fields.String()) + + +class AuthnStatusRequestSchema(EduidSchema, CSRFRequestMixin): + frontend_action = fields.String(required=True) + credential_id = fields.String(required=False) + + +class AuthnStatusResponseSchema(FluxStandardAction): + class AuthnStatusPayload(EduidSchema, CSRFRequestMixin): + authn_status = fields.String(required=True) + + payload = fields.Nested(AuthnStatusPayload) diff --git a/src/eduid/webapp/security/views/security.py b/src/eduid/webapp/security/views/security.py index 844e2c284..d8c2631fe 100644 --- a/src/eduid/webapp/security/views/security.py +++ b/src/eduid/webapp/security/views/security.py @@ -1,9 +1,13 @@ +from typing import Optional + from flask import Blueprint from eduid.common.config.base import FrontendAction from eduid.common.misc.timeutil import utc_now from eduid.common.rpc.exceptions import AmTaskFailed from eduid.userdb import User +from eduid.userdb.credentials import FidoCredential +from eduid.userdb.element import ElementKey from eduid.userdb.exceptions import UserOutOfSync from eduid.userdb.identity import IdentityType from eduid.userdb.proofing import NinProofingElement @@ -14,7 +18,7 @@ from eduid.webapp.common.api.messages import CommonMsg, FluxData, error_response, success_response from eduid.webapp.common.api.schemas.csrf import EmptyRequest from eduid.webapp.common.api.utils import save_and_sync_user -from eduid.webapp.common.authn.utils import get_authn_for_action +from eduid.webapp.common.authn.utils import get_authn_for_action, validate_authn_for_action from eduid.webapp.common.authn.vccs import revoke_all_credentials from eduid.webapp.common.session import session from eduid.webapp.security.app import current_security_app as current_app @@ -29,6 +33,8 @@ ) from eduid.webapp.security.schemas import ( AccountTerminatedSchema, + AuthnStatusRequestSchema, + AuthnStatusResponseSchema, IdentitiesResponseSchema, IdentityRequestSchema, NINRequestSchema, @@ -251,3 +257,27 @@ def refresh_user_data(user: User) -> FluxData: return error_response(message=CommonMsg.temp_problem) return success_response(message=SecurityMsg.user_updated) + + +@security_views.route("/authn-status", methods=["POST"]) +@UnmarshalWith(AuthnStatusRequestSchema) +@MarshalWith(AuthnStatusResponseSchema) +@require_user +def check_authn_status(user: User, frontend_action: str, credential_id: Optional[ElementKey] = None) -> FluxData: + credential = None + if credential_id is not None: + credential = user.credentials.find(credential_id) + if credential is None or isinstance(credential, FidoCredential) is False: + current_app.logger.error(f"Can't find credential with id: {credential_id}") + return error_response(message=SecurityMsg.credential_not_found) + + try: + _frontend_action = FrontendAction(frontend_action) + except ValueError: + current_app.logger.error(f"Invalid frontend action: {frontend_action}") + return error_response(message=SecurityMsg.frontend_action_not_supported) + + authn_status = validate_authn_for_action( + config=current_app.conf, frontend_action=_frontend_action, user=user, credential_used=credential + ) + return success_response(payload={"authn_status": authn_status.value}) From 4524ca8680d061628145951d1439dc950455259a Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Thu, 12 Sep 2024 10:50:02 +0200 Subject: [PATCH 2/3] refactor test method add_token_to_user -> add_security_key_to_user --- src/eduid/webapp/bankid/tests/test_app.py | 51 +++++------------------ src/eduid/webapp/common/api/testing.py | 30 ++++++++++++- src/eduid/webapp/eidas/tests/test_app.py | 31 +------------- 3 files changed, 41 insertions(+), 71 deletions(-) diff --git a/src/eduid/webapp/bankid/tests/test_app.py b/src/eduid/webapp/bankid/tests/test_app.py index d33823dae..d04d644df 100644 --- a/src/eduid/webapp/bankid/tests/test_app.py +++ b/src/eduid/webapp/bankid/tests/test_app.py @@ -3,15 +3,12 @@ import logging import os import unittest -from typing import Any, Mapping, Optional, Union +from typing import Any, Mapping, Optional from unittest.mock import MagicMock, patch -from fido2.webauthn import AuthenticatorAttachment - from eduid.common.config.base import EduidEnvironment, FrontendAction from eduid.common.misc.timeutil import utc_now from eduid.userdb import NinIdentity -from eduid.userdb.credentials import U2F, Webauthn from eduid.userdb.credentials.external import BankIDCredential, SwedenConnectCredential from eduid.userdb.element import ElementKey from eduid.userdb.identity import IdentityProofingMethod @@ -207,32 +204,6 @@ def update_config(self, config: dict[str, Any]) -> dict[str, Any]: ) return config - def add_token_to_user(self, eppn: str, credential_id: str, token_type: str) -> Union[U2F, Webauthn]: - user = self.app.central_userdb.get_user_by_eppn(eppn) - mfa_token: Union[U2F, Webauthn] - if token_type == "u2f": - mfa_token = U2F( - version="test", - keyhandle=credential_id, - public_key="test", - app_id="test", - attest_cert="test", - description="test", - created_by="test", - ) - else: - mfa_token = Webauthn( - keyhandle=credential_id, - credential_data="test", - app_id="test", - description="test", - created_by="test", - authenticator=AuthenticatorAttachment.CROSS_PLATFORM, - ) - user.credentials.add(mfa_token) - self.request_user_sync(user) - return mfa_token - def add_nin_to_user(self, eppn: str, nin: str, verified: bool) -> NinIdentity: user = self.app.central_userdb.get_user_by_eppn(eppn) nin_element = NinIdentity(number=nin, created_by="test", is_verified=verified) @@ -506,7 +477,7 @@ def test_u2f_token_verify(self, mock_request_user_sync: MagicMock): mock_request_user_sync.side_effect = self.request_user_sync eppn = self.test_user.eppn - credential = self.add_token_to_user(eppn, "test", "u2f") + credential = self.add_security_key_to_user(eppn, "test", "u2f") self._verify_user_parameters(eppn) @@ -527,7 +498,7 @@ def test_webauthn_token_verify(self, mock_request_user_sync: MagicMock): eppn = self.test_user.eppn - credential = self.add_token_to_user(eppn, "test", "webauthn") + credential = self.add_security_key_to_user(eppn, "test", "webauthn") self._verify_user_parameters(eppn) @@ -545,7 +516,7 @@ def test_webauthn_token_verify(self, mock_request_user_sync: MagicMock): def test_mfa_token_verify_wrong_verified_nin(self): eppn = self.test_user.eppn nin = self.test_user_wrong_nin - credential = self.add_token_to_user(eppn, "test", "u2f") + credential = self.add_security_key_to_user(eppn, "test", "u2f") self._verify_user_parameters(eppn, identity=nin, identity_present=False) @@ -568,7 +539,7 @@ def test_mfa_token_verify_no_verified_nin(self, mock_request_user_sync: MagicMoc eppn = self.test_unverified_user_eppn nin = self.test_user_nin - credential = self.add_token_to_user(eppn, "test", "webauthn") + credential = self.add_security_key_to_user(eppn, "test", "webauthn") self._verify_user_parameters(eppn, identity_verified=False) @@ -589,7 +560,7 @@ def test_mfa_token_verify_no_verified_nin(self, mock_request_user_sync: MagicMoc def test_mfa_token_verify_no_mfa_login(self): eppn = self.test_user.eppn - credential = self.add_token_to_user(eppn, "test", "u2f") + credential = self.add_security_key_to_user(eppn, "test", "u2f") self._verify_user_parameters(eppn) @@ -614,7 +585,7 @@ def test_mfa_token_verify_no_mfa_login(self): def test_mfa_token_verify_no_mfa_token_in_session(self): eppn = self.test_user.eppn - credential = self.add_token_to_user(eppn, "test", "webauthn") + credential = self.add_security_key_to_user(eppn, "test", "webauthn") self._verify_user_parameters(eppn) @@ -633,7 +604,7 @@ def test_mfa_token_verify_no_mfa_token_in_session(self): def test_mfa_token_verify_aborted_auth(self): eppn = self.test_user.eppn - credential = self.add_token_to_user(eppn, "test", "u2f") + credential = self.add_security_key_to_user(eppn, "test", "u2f") self._verify_user_parameters(eppn) @@ -653,7 +624,7 @@ def test_mfa_token_verify_aborted_auth(self): def test_mfa_token_verify_cancel_auth(self): eppn = self.test_user.eppn - credential = self.add_token_to_user(eppn, "test", "webauthn") + credential = self.add_security_key_to_user(eppn, "test", "webauthn") self._verify_user_parameters(eppn) @@ -674,7 +645,7 @@ def test_mfa_token_verify_cancel_auth(self): def test_mfa_token_verify_auth_fail(self): eppn = self.test_user.eppn - credential = self.add_token_to_user(eppn, "test", "u2f") + credential = self.add_security_key_to_user(eppn, "test", "u2f") self._verify_user_parameters(eppn) @@ -699,7 +670,7 @@ def test_webauthn_token_verify_backdoor(self, mock_request_user_sync: MagicMock) eppn = self.test_unverified_user_eppn nin = self.test_backdoor_nin - credential = self.add_token_to_user(eppn, "test", "webauthn") + credential = self.add_security_key_to_user(eppn, "test", "webauthn") self._verify_user_parameters(eppn) diff --git a/src/eduid/webapp/common/api/testing.py b/src/eduid/webapp/common/api/testing.py index 955b70a01..6395075a4 100644 --- a/src/eduid/webapp/common/api/testing.py +++ b/src/eduid/webapp/common/api/testing.py @@ -8,8 +8,9 @@ from contextlib import contextmanager from copy import deepcopy from datetime import timedelta -from typing import Any, Generator, Generic, Iterable, Mapping, Optional, TypeVar, cast +from typing import Any, Generator, Generic, Iterable, Mapping, Optional, TypeVar, Union, cast +from fido2.webauthn import AuthenticatorAttachment from flask.testing import FlaskClient from werkzeug.test import TestResponse @@ -18,6 +19,7 @@ from eduid.common.rpc.msg_relay import FullPostalAddress, NavetData from eduid.common.testing_base import CommonTestCase from eduid.userdb import User +from eduid.userdb.credentials import U2F, Webauthn from eduid.userdb.db import BaseDB from eduid.userdb.element import ElementKey from eduid.userdb.fixtures.users import UserFixtures @@ -347,6 +349,32 @@ def set_authn_action( ) sess.authn.sp.authns[sp_authn_req.authn_id] = sp_authn_req + def add_security_key_to_user(self, eppn: str, keyhandle: str, token_type: str = "webauthn") -> Union[U2F, Webauthn]: + user = self.app.central_userdb.get_user_by_eppn(eppn) + mfa_token: Union[U2F, Webauthn] + if token_type == "u2f": + mfa_token = U2F( + version="test", + keyhandle=keyhandle, + public_key="test", + app_id="test", + attest_cert="test", + description="test", + created_by="test", + ) + else: + mfa_token = Webauthn( + keyhandle=keyhandle, + credential_data="test", + app_id="test", + description="test", + created_by="test", + authenticator=AuthenticatorAttachment.CROSS_PLATFORM, + ) + user.credentials.add(mfa_token) + self.request_user_sync(user) + return mfa_token + @staticmethod def _get_all_navet_data(): return NavetData.model_validate(MessageSender.get_devel_all_navet_data()) diff --git a/src/eduid/webapp/eidas/tests/test_app.py b/src/eduid/webapp/eidas/tests/test_app.py index fc459d7f2..d86c065f1 100644 --- a/src/eduid/webapp/eidas/tests/test_app.py +++ b/src/eduid/webapp/eidas/tests/test_app.py @@ -6,12 +6,9 @@ from unittest import TestCase from unittest.mock import MagicMock, patch -from fido2.webauthn import AuthenticatorAttachment - from eduid.common.config.base import EduidEnvironment, FrontendAction from eduid.common.misc.timeutil import utc_now from eduid.userdb import NinIdentity -from eduid.userdb.credentials import U2F, Webauthn from eduid.userdb.credentials.external import EidasCredential, ExternalCredential, SwedenConnectCredential from eduid.userdb.element import ElementKey from eduid.userdb.identity import EIDASIdentity, EIDASLoa, IdentityProofingMethod, PridPersistence @@ -228,32 +225,6 @@ def update_config(self, config: dict[str, Any]) -> dict[str, Any]: ) return config - def add_security_key_to_user(self, eppn: str, credential_id: str, token_type: str) -> Union[U2F, Webauthn]: - user = self.app.central_userdb.get_user_by_eppn(eppn) - mfa_token: Union[U2F, Webauthn] - if token_type == "u2f": - mfa_token = U2F( - version="test", - keyhandle=credential_id, - public_key="test", - app_id="test", - attest_cert="test", - description="test", - created_by="test", - ) - else: - mfa_token = Webauthn( - keyhandle=credential_id, - credential_data="test", - app_id="test", - description="test", - created_by="test", - authenticator=AuthenticatorAttachment.CROSS_PLATFORM, - ) - user.credentials.add(mfa_token) - self.request_user_sync(user) - return mfa_token - def add_nin_to_user(self, eppn: str, nin: str, verified: bool) -> NinIdentity: user = self.app.central_userdb.get_user_by_eppn(eppn) nin_element = NinIdentity(number=nin, created_by="test", is_verified=verified) @@ -570,7 +541,7 @@ def test_verify_credential(self, mock_request_user_sync: MagicMock): for security_key_type in ["u2f", "webauthn"]: credential = self.add_security_key_to_user( - eppn, credential_id=f"test_{security_key_type}", token_type=security_key_type + eppn, keyhandle=f"test_{security_key_type}", token_type=security_key_type ) self.verify_token( endpoint="/verify-credential", From d7c330ee8bccd4478a3fcae304c6b22c731571bc Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Thu, 12 Sep 2024 10:50:26 +0200 Subject: [PATCH 3/3] add tests for authn status endpoint --- src/eduid/webapp/security/tests/test_app.py | 64 +++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/src/eduid/webapp/security/tests/test_app.py b/src/eduid/webapp/security/tests/test_app.py index 3fa554133..29acc355a 100644 --- a/src/eduid/webapp/security/tests/test_app.py +++ b/src/eduid/webapp/security/tests/test_app.py @@ -193,6 +193,16 @@ def _get_credentials(self): with self.session_cookie(self.browser, eppn) as client: return client.get("/credentials") + def _get_authn_status(self, frontend_action: FrontendAction, credential_id: Optional[str] = None): + data = {"frontend_action": frontend_action.value} + if credential_id is not None: + data["credential_id"] = credential_id + eppn = self.test_user_data["eduPersonPrincipalName"] + with self.session_cookie(self.browser, eppn) as client: + with client.session_transaction() as sess: + data["csrf_token"] = sess.get_csrf_token() + return client.post("/authn-status", json=data) + # actual tests def test_delete_account_no_csrf(self): @@ -567,3 +577,57 @@ def test_get_credentials(self): ], } self._check_success_response(response, type_="GET_SECURITY_CREDENTIALS_SUCCESS", payload=expected_payload) + + def test_authn_status_ok(self): + frontend_action = FrontendAction.CHANGE_PW_AUTHN + self.set_authn_action( + eppn=self.test_user_eppn, + frontend_action=frontend_action, + ) + response = self._get_authn_status(frontend_action=frontend_action) + self._check_success_response( + response=response, + type_="POST_SECURITY_AUTHN_STATUS_SUCCESS", + payload={"authn_status": AuthnActionStatus.OK.value}, + ) + + def test_authn_status_stale(self): + frontend_action = FrontendAction.CHANGE_PW_AUTHN + self.set_authn_action(eppn=self.test_user_eppn, frontend_action=frontend_action, age=timedelta(minutes=10)) + response = self._get_authn_status(frontend_action=frontend_action) + self._check_success_response( + response=response, + type_="POST_SECURITY_AUTHN_STATUS_SUCCESS", + payload={"authn_status": AuthnActionStatus.STALE.value}, + ) + + def test_authn_status_no_mfa(self): + frontend_action = FrontendAction.REMOVE_SECURITY_KEY_AUTHN + self.set_authn_action(eppn=self.test_user_eppn, frontend_action=frontend_action) + response = self._get_authn_status(frontend_action=frontend_action) + self._check_success_response( + response=response, + type_="POST_SECURITY_AUTHN_STATUS_SUCCESS", + payload={"authn_status": AuthnActionStatus.NO_MFA.value}, + ) + + def test_authn_status_credential_not_existing(self): + frontend_action = FrontendAction.VERIFY_CREDENTIAL + self.set_authn_action(eppn=self.test_user_eppn, frontend_action=frontend_action, force_mfa=True) + response = self._get_authn_status(frontend_action=frontend_action, credential_id="none_existing_credential_id") + self._check_error_response( + response=response, + type_="POST_SECURITY_AUTHN_STATUS_FAIL", + msg=SecurityMsg.credential_not_found, + ) + + def test_authn_status_credential_not_used(self): + frontend_action = FrontendAction.VERIFY_CREDENTIAL + credential = self.add_security_key_to_user(self.test_user_eppn, keyhandle="keyhandle_1") + self.set_authn_action(eppn=self.test_user_eppn, frontend_action=frontend_action, force_mfa=True) + response = self._get_authn_status(frontend_action=frontend_action, credential_id=credential.key) + self._check_success_response( + response=response, + type_="POST_SECURITY_AUTHN_STATUS_SUCCESS", + payload={"authn_status": AuthnActionStatus.CREDENTIAL_NOT_RECENTLY_USED.value}, + )