diff --git a/src/eduid/webapp/common/api/decorators.py b/src/eduid/webapp/common/api/decorators.py index a06db61a6..22fa7ea4e 100644 --- a/src/eduid/webapp/common/api/decorators.py +++ b/src/eduid/webapp/common/api/decorators.py @@ -63,6 +63,22 @@ def require_eppn_decorator(*args: Any, **kwargs: Any) -> EduidViewReturnType: return require_eppn_decorator +def require_not_logged_in(f: EduidRouteCallable) -> EduidRouteCallable: + """ + Decorator for views that require the user not to be logged in. + + Because it can return a FluxData, this decorator must come after the MarshalWith decorator. + """ + + @wraps(f) + def require_eppn_decorator(*args: Any, **kwargs: Any) -> EduidViewReturnType: + if session.common.is_logged_in: + return error_response(message=CommonMsg.logout_required) + return f(*args, **kwargs) + + return require_eppn_decorator + + TRequireUserResult = TypeVar("TRequireUserResult") diff --git a/src/eduid/webapp/common/api/messages.py b/src/eduid/webapp/common/api/messages.py index 44639c1e0..f3b3efe7e 100644 --- a/src/eduid/webapp/common/api/messages.py +++ b/src/eduid/webapp/common/api/messages.py @@ -43,6 +43,8 @@ class CommonMsg(TranslatableMsg): nin_invalid = "nin needs to be formatted as 18|19|20yymmddxxxx" # Email address validation error email_invalid = "email needs to be formatted according to RFC2822" + # user must log out + logout_required = "logout_required" # TODO: These _should_ be unused now - check and remove csrf_try_again = "csrf.try_again" csrf_missing = "csrf.missing" diff --git a/src/eduid/webapp/common/api/testing.py b/src/eduid/webapp/common/api/testing.py index bb8f7c137..8c6ed18d0 100644 --- a/src/eduid/webapp/common/api/testing.py +++ b/src/eduid/webapp/common/api/testing.py @@ -71,6 +71,8 @@ "saml2": {"level": "WARNING"}, "xmlsec": {"level": "INFO"}, "urllib3": {"level": "INFO"}, + "pymongo.serverSelection": {"level": "INFO"}, + "pymongo.command": {"level": "INFO"}, "eduid.webapp.common.session": {"level": "INFO"}, "eduid.userdb.userdb.extra_debug": {"level": "INFO"}, "eduid.userdb.db.extra_debug": {"level": "INFO"}, diff --git a/src/eduid/webapp/common/session/namespaces.py b/src/eduid/webapp/common/session/namespaces.py index 0d6103623..4c5453e70 100644 --- a/src/eduid/webapp/common/session/namespaces.py +++ b/src/eduid/webapp/common/session/namespaces.py @@ -34,7 +34,7 @@ class SessionNSBase(BaseModel, ABC): def to_dict(self) -> dict[str, Any]: - return self.dict() + return self.model_dump() @classmethod def from_dict(cls: type[TSessionNSSubclass], data: Mapping[str, Any]) -> TSessionNSSubclass: @@ -51,6 +51,12 @@ def _from_dict_transform(cls: type[SessionNSBase], data: Mapping[str, Any]) -> d _data = deepcopy(data) # do not modify callers data return dict(_data) + def clear(self): + """ + Clears all session namespace data. + """ + self.__dict__ = self.model_construct(_cls=self.__class__, field_set={}).__dict__ + TSessionNSSubclass = TypeVar("TSessionNSSubclass", bound=SessionNSBase) diff --git a/src/eduid/webapp/common/session/tests/test_namespaces.py b/src/eduid/webapp/common/session/tests/test_namespaces.py index e5fc0b20e..30602330f 100644 --- a/src/eduid/webapp/common/session/tests/test_namespaces.py +++ b/src/eduid/webapp/common/session/tests/test_namespaces.py @@ -102,6 +102,26 @@ def test_to_dict_from_dict_with_timestamp(self): assert second.idp.sso_cookie_val == first.idp.sso_cookie_val assert second.idp.ts == first.idp.ts + def test_clear_namespace(self): + _meta = SessionMeta.new(app_secret="secret") + base_session = self.app.session_interface.manager.get_session(meta=_meta, new=True) + first = EduidSession(app=self.app, meta=_meta, base_session=base_session, new=True) + first.signup.email.address = "test@example.com" + first.signup.email.verification_code = "123456" + first.persist() + # load session again and clear it + base_session = self.app.session_interface.manager.get_session(meta=_meta, new=False) + second = EduidSession(self.app, _meta, base_session, new=False) + assert second.signup.email.address == "test@example.com" + assert second.signup.email.verification_code == "123456" + second.signup.clear() + second.signup.email.address = "test@example.com" + second.persist() + # load session one more time and make sure verification_code is empty + base_session = self.app.session_interface.manager.get_session(meta=_meta, new=False) + third = EduidSession(self.app, _meta, base_session, new=False) + assert third.signup.email.address == "test@example.com" + assert third.signup.email.verification_code is None class TestAuthnNamespace(EduidAPITestCase): app: SessionTestApp diff --git a/src/eduid/webapp/signup/tests/test_app.py b/src/eduid/webapp/signup/tests/test_app.py index 75cd6ce3c..2e263e47a 100644 --- a/src/eduid/webapp/signup/tests/test_app.py +++ b/src/eduid/webapp/signup/tests/test_app.py @@ -3,7 +3,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta, timezone from enum import Enum -from typing import Any, Mapping, Optional, Union +from typing import Any, Mapping, Optional from unittest.mock import MagicMock, patch from uuid import uuid4 @@ -83,8 +83,17 @@ def update_config(self, config: dict[str, Any]) -> dict[str, Any]: ) return config - def _get_captcha(self): - with self.session_cookie_anon(self.browser) as client: + def _get_captcha( + self, + expect_success: bool = True, + expected_message: Optional[TranslatableMsg] = None, + logged_in: bool = False, + ): + eppn = None + if logged_in: + eppn = self.test_user.eppn + + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: with self.app.test_request_context(): endpoint = url_for("signup.captcha_request") with client.session_transaction() as sess: @@ -92,17 +101,29 @@ def _get_captcha(self): "csrf_token": sess.get_csrf_token(), } response = client.post(f"{endpoint}", data=json.dumps(data), content_type=self.content_type_json) + + if expect_success: + type_ = "POST_SIGNUP_GET_CAPTCHA_SUCCESS" + assert self.get_response_payload(response)["captcha_img"].startswith("data:image/png;base64,") + assert self.get_response_payload(response)["captcha_audio"].startswith("data:audio/wav;base64,") + else: + type_ = "POST_SIGNUP_GET_CAPTCHA_FAIL" + self._check_api_response( response, status=200, - type_="POST_SIGNUP_GET_CAPTCHA_SUCCESS", + type_=type_, + message=expected_message, ) - assert self.get_response_payload(response)["captcha_img"].startswith("data:image/png;base64,") - assert self.get_response_payload(response)["captcha_audio"].startswith("data:audio/wav;base64,") + return SignupResult(url=endpoint, reached_state=SignupState.S9_GENERATE_CAPTCHA, response=response) - def _get_state(self) -> SignupResult: - with self.session_cookie_anon(self.browser) as client: + def _get_state(self, logged_in: bool = False) -> SignupResult: + eppn = None + if logged_in: + eppn = self.test_user.eppn + + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: with self.app.test_request_context(): endpoint = url_for("signup.get_state") logger.info(f"Making GET request to {endpoint}") @@ -117,34 +138,30 @@ def _get_state(self) -> SignupResult: def _captcha( self, captcha_data: Optional[Mapping[str, Any]] = None, - generate_internal_captcha: bool = True, add_magic_cookie: bool = False, magic_cookie_name: Optional[str] = None, expect_success: bool = True, expected_message: Optional[TranslatableMsg] = None, expected_payload: Optional[Mapping[str, Any]] = None, + logged_in: bool = False, ): """ :param captcha_data: to control the data POSTed to the /captcha endpoint :param add_magic_cookie: add magic cookie to the captcha request """ - with self.session_cookie_anon(self.browser) as client: + eppn = None + if logged_in: + eppn = self.test_user.eppn + + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: with self.app.test_request_context(): endpoint = url_for("signup.captcha_response") - if generate_internal_captcha: - self._get_captcha() - with client.session_transaction() as sess: - assert sess.signup.captcha.internal_answer - data = { - "csrf_token": sess.get_csrf_token(), - "internal_response": sess.signup.captcha.internal_answer, - } - else: - with client.session_transaction() as sess: - data = { - "csrf_token": sess.get_csrf_token(), - } + with client.session_transaction() as sess: + data = { + "csrf_token": sess.get_csrf_token(), + "internal_response": sess.signup.captcha.internal_answer, + } if add_magic_cookie: assert self.app.conf.magic_cookie_name is not None @@ -203,6 +220,7 @@ def _register_email( expect_success: bool = True, expected_message: Optional[TranslatableMsg] = None, expected_payload: Optional[Mapping[str, Any]] = None, + logged_in: bool = False, ): """ Trigger sending an email with a verification code. @@ -210,8 +228,11 @@ def _register_email( :param data1: to control the data POSTed to the verify email endpoint :param email: what email address to use """ + eppn = None + if logged_in: + eppn = self.test_user.eppn - with self.session_cookie_anon(self.browser) as client: + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: with self.app.test_request_context(): endpoint = url_for("signup.register_email") with client.session_transaction() as sess: @@ -270,14 +291,18 @@ def _verify_email( expect_success: bool = True, expected_message: Optional[TranslatableMsg] = None, expected_payload: Optional[Mapping[str, Any]] = None, + logged_in: bool = False, ): """ Verify registered email with a verification code. :param data1: to control the data POSTed to the verify email endpoint """ + eppn = None + if logged_in: + eppn = self.test_user.eppn - with self.session_cookie_anon(self.browser) as client: + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: with self.app.test_request_context(): endpoint = url_for("signup.verify_email") with client.session_transaction() as sess: @@ -336,6 +361,7 @@ def _accept_tou( expect_success: bool = True, expected_message: Optional[TranslatableMsg] = None, expected_payload: Optional[Mapping[str, Any]] = None, + logged_in: bool = False, ): """ Verify registered email with a verification code. @@ -346,7 +372,11 @@ def _accept_tou( if tou_version is None: tou_version = self.app.conf.tou_version - with self.session_cookie_anon(self.browser) as client: + eppn = None + if logged_in: + eppn = self.test_user.eppn + + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: with self.app.test_request_context(): endpoint = url_for("signup.accept_tou") with client.session_transaction() as sess: @@ -399,13 +429,18 @@ def _generate_password( expect_success: bool = True, expected_message: Optional[TranslatableMsg] = None, expected_payload: Optional[Mapping[str, Any]] = None, + logged_in: bool = False, ): """ Generate a password and return in state. :param data1: to control the data POSTed to the verify email endpoint """ - with self.session_cookie_anon(self.browser) as client: + eppn = None + if logged_in: + eppn = self.test_user.eppn + + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: with self.app.test_request_context(): endpoint = url_for("signup.get_password") with client.session_transaction() as sess: @@ -458,8 +493,13 @@ def _prepare_for_create_user( captcha_completed: bool = True, email_verified: bool = True, generated_password: Optional[str] = "test_password", + logged_in: bool = False, ): - with self.session_cookie_anon(self.browser) as client: + eppn = None + if logged_in: + eppn = self.test_user.eppn + + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: with client.session_transaction() as sess: sess.signup.name.given_name = given_name sess.signup.name.surname = surname @@ -481,13 +521,19 @@ def _create_user( expect_success: bool = True, expected_message: Optional[TranslatableMsg] = None, expected_payload: Optional[Mapping[str, Any]] = None, + logged_in: bool = False, ): """ Create a new user with the data in the session. """ mock_add_credentials.return_value = True mock_request_user_sync.side_effect = self.request_user_sync - with self.session_cookie_anon(self.browser) as client: + + eppn = None + if logged_in: + eppn = self.test_user.eppn + + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: with client.session_transaction() as sess: with self.app.test_request_context(): endpoint = url_for("signup.create_user") @@ -573,6 +619,7 @@ def _get_invite_data( expect_success: bool = True, expected_message: Optional[TranslatableMsg] = None, expected_payload: Optional[Mapping[str, Any]] = None, + logged_in: bool = False, ): """ Get invite data from the invite data endpoint. @@ -580,24 +627,17 @@ def _get_invite_data( with self.app.test_request_context(): endpoint = url_for("signup.get_invite") - if eppn is None: - with self.session_cookie_anon(self.browser) as client: - with client.session_transaction() as sess: - data = { - "invite_code": invite_code, - "csrf_token": sess.get_csrf_token(), - } - if data1 is not None: - data.update(data1) - else: - with self.session_cookie(self.browser, eppn=eppn) as client: - with client.session_transaction() as sess: - data = { - "invite_code": invite_code, - "csrf_token": sess.get_csrf_token(), - } - if data1 is not None: - data.update(data1) + if eppn is None and logged_in: + eppn = self.test_user.eppn + + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: + with client.session_transaction() as sess: + data = { + "invite_code": invite_code, + "csrf_token": sess.get_csrf_token(), + } + if data1 is not None: + data.update(data1) logger.info(f"Making request to {endpoint}") response = client.post(f"{endpoint}", data=json.dumps(data), content_type=self.content_type_json) @@ -655,8 +695,13 @@ def _accept_invite( expect_success: bool = True, expected_message: Optional[TranslatableMsg] = None, expected_payload: Optional[Mapping[str, Any]] = None, + logged_in: bool = False, ): - with self.session_cookie_anon(self.browser) as client: + eppn = None + if logged_in: + eppn = self.test_user.eppn + + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: with client.session_transaction() as sess: with self.app.test_request_context(): endpoint = url_for("signup.accept_invite") @@ -718,33 +763,22 @@ def _complete_invite( expected_payload: Optional[Mapping[str, Any]] = None, ): mock_request_user_sync.side_effect = self.request_user_sync + logged_in = False + if eppn: + logged_in = True - if eppn is None: - with self.session_cookie_anon(self.browser) as client: - with client.session_transaction() as sess: - with self.app.test_request_context(): - endpoint = url_for("signup.complete_invite") - data = { - "csrf_token": sess.get_csrf_token(), - } - if data1 is not None: - data.update(data1) - - logger.info(f"Making request to {endpoint}") - response = client.post(f"{endpoint}", data=json.dumps(data), content_type=self.content_type_json) - else: - with self.session_cookie(self.browser, eppn=eppn) as client: - with client.session_transaction() as sess: - with self.app.test_request_context(): - endpoint = url_for("signup.complete_invite") - data = { - "csrf_token": sess.get_csrf_token(), - } - if data1 is not None: - data.update(data1) - - logger.info(f"Making request to {endpoint}") - response = client.post(f"{endpoint}", data=json.dumps(data), content_type=self.content_type_json) + with self.session_cookie(self.browser, eppn=eppn, logged_in=logged_in) as client: + with client.session_transaction() as sess: + with self.app.test_request_context(): + endpoint = url_for("signup.complete_invite") + data = { + "csrf_token": sess.get_csrf_token(), + } + if data1 is not None: + data.update(data1) + + logger.info(f"Making request to {endpoint}") + response = client.post(f"{endpoint}", data=json.dumps(data), content_type=self.content_type_json) logger.info(f"Request to {endpoint} result: {response}") @@ -806,10 +840,28 @@ def test_get_state_initial(self): "user_created": False, }, f"actual state is {state}" + def test_get_state_initial_logged_in(self): + res = self._get_state(logged_in=True) + assert res.reached_state == SignupState.S10_GET_STATE + state = self.get_response_payload(res.response)["state"] + assert state == { + "already_signed_up": True, + "captcha": {"completed": False}, + "credentials": {"completed": False, "password": None}, + "email": {"address": None, "bad_attempts": 0, "bad_attempts_max": 3, "completed": False, "sent_at": None}, + "invite": {"completed": False, "finish_url": None, "initiated_signup": False}, + "name": {"given_name": None, "surname": None}, + "tou": {"completed": False, "version": "2016-v1"}, + "user_created": False, + }, f"actual state is {state}" + def test_accept_tou(self): res = self._accept_tou() assert res.reached_state == SignupState.S2_ACCEPT_TOU + def test_accept_tou_logged_in(self): + self._accept_tou(logged_in=True, expect_success=False, expected_message=CommonMsg.logout_required) + def test_not_accept_tou(self): res = self._accept_tou(accept_tou=False, expect_success=False, expected_message=SignupMsg.tou_not_completed) assert res.reached_state == SignupState.S2_ACCEPT_TOU @@ -840,15 +892,23 @@ def test_get_password_bad_csrf(self): assert self.get_response_payload(res.response)["error"] == {"csrf_token": ["CSRF failed to validate"]} def test_captcha(self): + res = self._get_captcha() + assert res.reached_state == SignupState.S9_GENERATE_CAPTCHA res = self._captcha() assert res.reached_state == SignupState.S3_COMPLETE_CAPTCHA + def test_captcha_logged_in(self): + res = self._get_captcha() + assert res.reached_state == SignupState.S9_GENERATE_CAPTCHA + self._captcha(logged_in=True, expect_success=False, expected_message=CommonMsg.logout_required) + def test_captcha_new_wrong_csrf(self): data = {"csrf_token": "wrong-token"} res = self._captcha(captcha_data=data, expect_success=False, expected_message=None) assert self.get_response_payload(res.response)["error"] == {"csrf_token": ["CSRF failed to validate"]} def test_captcha_fail(self): + self._get_captcha() res = self._captcha( captcha_data={"internal_response": "wrong"}, expect_success=False, @@ -858,8 +918,8 @@ def test_captcha_fail(self): def test_captcha_internal_fail_to_many_attempts(self): # run once to generate captcha + self._get_captcha() self._captcha( - generate_internal_captcha=True, captcha_data={"internal_response": "wrong"}, expect_success=False, expected_message=SignupMsg.captcha_failed, @@ -867,14 +927,12 @@ def test_captcha_internal_fail_to_many_attempts(self): for _ in range(self.app.conf.captcha_max_bad_attempts): # make x bad attempts to get over the limit self._captcha( - generate_internal_captcha=False, captcha_data={"internal_response": "wrong"}, expect_success=False, expected_message=SignupMsg.captcha_failed, ) # try one more time, should fail even as we use the correct code res = self._captcha( - generate_internal_captcha=False, expect_success=False, expected_message=SignupMsg.captcha_failed, ) @@ -882,7 +940,6 @@ def test_captcha_internal_fail_to_many_attempts(self): def test_captcha_internal_not_requested(self): res = self._captcha( - generate_internal_captcha=False, captcha_data={"internal_response": "not-requested"}, expect_success=False, expected_message=SignupMsg.captcha_not_requested, @@ -894,6 +951,7 @@ def test_captcha_backdoor(self): self.app.conf.magic_cookie_name = "magic" self.app.conf.environment = EduidEnvironment("dev") + self._get_captcha() res = self._captcha( add_magic_cookie=True, expect_success=True, @@ -905,6 +963,7 @@ def test_captcha_backdoor_right_code(self): self.app.conf.magic_cookie_name = "magic" self.app.conf.environment = EduidEnvironment("dev") + self._get_captcha() res = self._captcha( add_magic_cookie=True, captcha_data={"internal_response": self.app.conf.captcha_backdoor_code}, @@ -917,6 +976,7 @@ def test_captcha_backdoor_wrong_code(self): self.app.conf.magic_cookie_name = "magic" self.app.conf.environment = EduidEnvironment("dev") + self._get_captcha() res = self._captcha( add_magic_cookie=True, captcha_data={"internal_response": "wrong"}, @@ -929,6 +989,7 @@ def test_captcha_no_backdoor_in_pro(self): self.app.conf.magic_cookie = "magic-cookie" self.app.conf.magic_cookie_name = "magic" self.app.conf.environment = EduidEnvironment("production") + self._get_captcha() res = self._captcha( add_magic_cookie=True, expect_success=False, @@ -940,6 +1001,7 @@ def test_captcha_no_backdoor_misconfigured1(self): self.app.conf.magic_cookie = "magic-cookie" self.app.conf.magic_cookie_name = "" self.app.conf.environment = EduidEnvironment("dev") + self._get_captcha() res = self._captcha( add_magic_cookie=True, expect_success=False, @@ -952,6 +1014,7 @@ def test_captcha_no_backdoor_misconfigured2(self): self.app.conf.magic_cookie = "" self.app.conf.magic_cookie_name = "magic" self.app.conf.environment = EduidEnvironment("dev") + self._get_captcha() res = self._captcha( add_magic_cookie=True, expect_success=False, @@ -960,7 +1023,7 @@ def test_captcha_no_backdoor_misconfigured2(self): assert res.reached_state == SignupState.S3_COMPLETE_CAPTCHA def test_captcha_no_data_fail(self): - with self.session_cookie_anon(self.browser) as client: + with self.session_cookie(self.browser, eppn=None) as client: response = client.post("/captcha") self.assertEqual(response.status_code, 200) data = json.loads(response.data) @@ -972,6 +1035,7 @@ def test_register_new_user(self): given_name = "John" surname = "Smith" email = "jsmith@example.com" + self._get_captcha() self._captcha() res = self._register_email( given_name=given_name, @@ -982,13 +1046,29 @@ def test_register_new_user(self): ) assert res.reached_state == SignupState.S4_REGISTER_EMAIL assert self.app.messagedb.db_count() == 1 - with self.session_cookie_anon(self.browser) as client: + with self.session_cookie(self.browser, eppn=None) as client: with client.session_transaction() as sess: assert sess.signup.email.address == email assert sess.signup.name.given_name == given_name assert sess.signup.name.surname == surname + def test_register_new_user_logged_in(self): + given_name = "John" + surname = "Smith" + email = "jsmith@example.com" + self._get_captcha() + self._captcha() + self._register_email( + given_name=given_name, + surname=surname, + email=email, + logged_in=True, + expect_success=False, + expected_message=CommonMsg.logout_required, + ) + def test_register_new_user_mixed_case(self): + self._get_captcha() self._captcha() mixed_case_email = "MixedCase@example.com" res = self._register_email(email=mixed_case_email) @@ -999,6 +1079,7 @@ def test_register_new_user_mixed_case(self): assert sess.signup.email.address == mixed_case_email.lower() def test_register_existing_user(self): + self._get_captcha() self._captcha() res = self._register_email( email="johnsmith@example.com", expect_success=False, expected_message=SignupMsg.email_used @@ -1006,6 +1087,7 @@ def test_register_existing_user(self): assert res.reached_state == SignupState.S4_REGISTER_EMAIL def test_register_existing_user_mixed_case(self): + self._get_captcha() self._captcha() res = self._register_email( email="JohnSmith@Example.com", expect_success=False, expected_message=SignupMsg.email_used @@ -1014,6 +1096,7 @@ def test_register_existing_user_mixed_case(self): def test_register_existing_signup_user(self): # TODO: for backwards compatibility, remove when compatibility code in view is removed + self._get_captcha() self._captcha() res = self._register_email(email="johnsmith2@example.com") assert res.reached_state == SignupState.S4_REGISTER_EMAIL @@ -1021,15 +1104,17 @@ def test_register_existing_signup_user(self): def test_register_existing_signup_user_mixed_case(self): # TODO: for backwards compatibility, remove when compatibility code in view is removed mixed_case_email = "JohnSmith2@Example.com" + self._get_captcha() self._captcha() res = self._register_email(email=mixed_case_email) assert res.reached_state == SignupState.S4_REGISTER_EMAIL - with self.session_cookie_anon(self.browser) as client: + with self.session_cookie(self.browser, eppn=False, logged_in=False) as client: with client.session_transaction() as sess: assert sess.signup.email.address == mixed_case_email.lower() def test_register_user_resend(self): + self._get_captcha() self._captcha() self._register_email(expect_success=True, expected_message=None) with self.session_cookie_anon(self.browser) as client: @@ -1044,6 +1129,7 @@ def test_register_user_resend(self): assert self.app.messagedb.db_count() == 2 def test_register_user_resend_email_throttled(self): + self._get_captcha() self._captcha() self._register_email(expect_success=True, expected_message=None) res = self._register_email(expect_success=False, expected_message=SignupMsg.email_throttled) @@ -1051,6 +1137,7 @@ def test_register_user_resend_email_throttled(self): assert self.app.messagedb.db_count() == 1 def test_register_user_resend_mail_expired(self): + self._get_captcha() self._captcha() self._register_email(expect_success=True, expected_message=None) with self.session_cookie_anon(self.browser) as client: @@ -1065,12 +1152,20 @@ def test_register_user_resend_mail_expired(self): assert self.app.messagedb.db_count() == 2 def test_verify_email(self): + self._get_captcha() self._captcha() self._register_email() response = self._verify_email() assert response.reached_state == SignupState.S5_VERIFY_EMAIL + def test_verify_email_logged_in(self): + self._get_captcha() + self._captcha() + self._register_email() + self._verify_email(logged_in=True, expect_success=False, expected_message=CommonMsg.logout_required) + def test_verify_email_wrong_code(self): + self._get_captcha() self._captcha() self._register_email() data = {"verification_code": "wrong"} @@ -1080,6 +1175,7 @@ def test_verify_email_wrong_code(self): assert response.reached_state == SignupState.S5_VERIFY_EMAIL def test_verify_email_wrong_code_to_many_attempts(self): + self._get_captcha() self._captcha() self._register_email() data = {"verification_code": "wrong"} @@ -1094,6 +1190,7 @@ def test_verify_email_wrong_code_to_many_attempts(self): def test_verify_email_mixed_case(self): mixed_case_email = "MixedCase@Example.com" + self._get_captcha() self._captcha() self._register_email(email=mixed_case_email) response = self._verify_email() @@ -1121,14 +1218,10 @@ def test_create_user(self): assert user.surname == surname assert user.mail_addresses.to_list()[0].email == email - def test_create_user_eppn_in_session(self): + def test_create_user_logged_in(self): email = "test@example.com" self._prepare_for_create_user(email=email) - with self.session_cookie_anon(self.browser) as client: - with client.session_transaction() as sess: - sess.common.eppn = "some-eppn" - response = self._create_user(expect_success=False, expected_message=SignupMsg.user_already_exists) - assert response.reached_state == SignupState.S6_CREATE_USER + self._create_user(logged_in=True, expect_success=False, expected_message=CommonMsg.logout_required) def test_create_user_out_of_sync(self): self._prepare_for_create_user() @@ -1194,7 +1287,10 @@ def test_get_invite_data(self): def test_get_invite_data_already_logged_in(self): invite = self._create_invite() res = self._get_invite_data( - email=invite.get_primary_mail_address(), invite_code=invite.invite_code, eppn=self.test_user.eppn + email=invite.get_primary_mail_address(), + invite_code=invite.invite_code, + eppn=self.test_user.eppn, + logged_in=True, ) assert res.reached_state == SignupState.S0_GET_INVITE_DATA @@ -1312,7 +1408,7 @@ def test_get_code_backdoor(self): self._register_email(email=email) response = self._get_code_backdoor(email=email) - with self.session_cookie_anon(self.browser) as client: + with self.session_cookie(self.browser, eppn=None) as client: with client.session_transaction() as sess: assert response.text == sess.signup.email.verification_code diff --git a/src/eduid/webapp/signup/views.py b/src/eduid/webapp/signup/views.py index 8cfe3179e..b8bf8226c 100644 --- a/src/eduid/webapp/signup/views.py +++ b/src/eduid/webapp/signup/views.py @@ -7,7 +7,7 @@ from eduid.common.utils import generate_password from eduid.userdb import User from eduid.userdb.exceptions import UserOutOfSync -from eduid.webapp.common.api.decorators import MarshalWith, UnmarshalWith, require_user +from eduid.webapp.common.api.decorators import MarshalWith, UnmarshalWith, require_not_logged_in, require_user from eduid.webapp.common.api.exceptions import ProofingLogFailure from eduid.webapp.common.api.helpers import check_magic_cookie from eduid.webapp.common.api.messages import CommonMsg, FluxData, error_response, success_response @@ -55,6 +55,7 @@ def get_state(): @signup_views.route("/register-email", methods=["POST"]) @UnmarshalWith(NameAndEmailSchema) @MarshalWith(SignupStatusResponse) +@require_not_logged_in def register_email(given_name: str, surname: str, email: str): """ Register a with new email address. @@ -65,8 +66,13 @@ def register_email(given_name: str, surname: str, email: str): if not session.signup.captcha.completed: # don't allow registration without captcha completion # this is so that a malicious user can't send a lot of emails or enumerate email addresses already registered + current_app.logger.info("Captcha not completed") return error_response(message=SignupMsg.captcha_not_completed) + if session.signup.email.completed: + current_app.logger.info("Email already verified") + return success_response(payload={"state": session.signup.to_dict()}) + email_status = check_email_status(email) if email_status == EmailStatus.ADDRESS_USED: current_app.logger.info("Email address already used") @@ -79,6 +85,9 @@ def register_email(given_name: str, surname: str, email: str): current_app.stats.count(name="resend_code") elif email_status == EmailStatus.NEW: current_app.logger.info("Starting new signup") + # make sure the session is clean + session.signup.email.clear() + session.signup.name.clear() session.signup.name.given_name = given_name session.signup.name.surname = surname session.signup.email.address = email @@ -108,6 +117,7 @@ def register_email(given_name: str, surname: str, email: str): @signup_views.route("/verify-email", methods=["POST"]) @UnmarshalWith(VerifyEmailRequest) @MarshalWith(SignupStatusResponse) +@require_not_logged_in def verify_email(verification_code: str): """ Verify the email address. @@ -117,8 +127,13 @@ def verify_email(verification_code: str): current_app.logger.debug(f"verification code: {verification_code}") if not session.signup.captcha.completed: + current_app.logger.info("Captcha not completed") return error_response(message=SignupMsg.captcha_not_completed) + if session.signup.email.completed: + current_app.logger.info("Email already verified") + return success_response(payload={"state": session.signup.to_dict()}) + if is_email_verification_expired(sent_ts=session.signup.email.sent_at): current_app.logger.info("Email verification expired") return error_response(message=SignupMsg.email_verification_expired) @@ -146,11 +161,17 @@ def verify_email(verification_code: str): @signup_views.route("/accept-tou", methods=["POST"]) @UnmarshalWith(AcceptTouRequest) @MarshalWith(SignupStatusResponse) +@require_not_logged_in def accept_tou(tou_accepted: bool, tou_version: str): """ Accept the Terms of Use. """ - current_app.logger.info("Accepting ToU") + current_app.logger.info(f"Accepting ToU: {tou_accepted}, version: {tou_version}") + + if session.signup.tou.completed: + current_app.logger.info("ToU already completed") + return success_response(payload={"state": session.signup.to_dict()}) + if not tou_accepted: current_app.logger.info("ToU not completed") return error_response(message=SignupMsg.tou_not_completed) @@ -167,6 +188,7 @@ def accept_tou(tou_accepted: bool, tou_version: str): @signup_views.route("/get-captcha", methods=["POST"]) @UnmarshalWith(EmptyRequest) @MarshalWith(CaptchaResponse) +@require_not_logged_in def captcha_request() -> FluxData: if session.signup.captcha.completed: return error_response(message=SignupMsg.captcha_already_completed) @@ -180,12 +202,17 @@ def captcha_request() -> FluxData: @signup_views.route("/captcha", methods=["POST"]) @UnmarshalWith(CaptchaCompleteRequest) @MarshalWith(SignupStatusResponse) +@require_not_logged_in def captcha_response(internal_response: Optional[str] = None) -> FluxData: """ Check for humanness even at level AL1. """ current_app.logger.info("Checking captcha") + if session.signup.captcha.completed: + current_app.logger.info("Captcha already completed") + return error_response(message=SignupMsg.captcha_already_completed) + captcha_verified = False if session.signup.captcha.bad_attempts >= current_app.conf.captcha_max_bad_attempts: @@ -222,6 +249,7 @@ def captcha_response(internal_response: Optional[str] = None) -> FluxData: @signup_views.route("/get-password", methods=["POST"]) @UnmarshalWith(EmptyRequest) @MarshalWith(SignupStatusResponse) +@require_not_logged_in def get_password() -> FluxData: current_app.logger.info("Password requested") if session.signup.credentials.password is None: @@ -233,6 +261,7 @@ def get_password() -> FluxData: @signup_views.route("/create-user", methods=["POST"]) @UnmarshalWith(CreateUserRequest) @MarshalWith(SignupStatusResponse) +@require_not_logged_in def create_user(use_password: bool, use_webauthn: bool) -> FluxData: current_app.logger.info("Creating user")