diff --git a/.vscode/settings.json b/.vscode/settings.json index 3f08348cb..a97b8628a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -76,4 +76,7 @@ "Werkzeug", "zxcvbn" ], + "mypy-type-checker.args": [ + "--ignore-missing-imports" + ], } \ No newline at end of file diff --git a/requirements/test_requirements.txt b/requirements/test_requirements.txt index 73933e71f..013b37c01 100644 --- a/requirements/test_requirements.txt +++ b/requirements/test_requirements.txt @@ -32,6 +32,10 @@ anyio==4.4.0 \ # httpx # starlette # watchfiles +apscheduler==3.10.4 \ + --hash=sha256:e6df071b27d9be898e486bc7940a7be50b4af2e9da7c08f0744a96d4bd4cef4a \ + --hash=sha256:fb91e8a768632a4756a585f79ec834e0e27aad5860bac7eaa523d9ccefd87661 + # via -r worker_requirements.in arabic-reshaper==3.0.0 \ --hash=sha256:3f71d5034bb694204a239a6f1ebcf323ac3c5b059de02259235e2016a1a5e2dc \ --hash=sha256:ffcd13ba5ec007db71c072f5b23f420da92ac7f268512065d49e790e62237099 @@ -1734,6 +1738,7 @@ pytz==2024.1 \ --hash=sha256:328171f4e3623139da4983451950b28e95ac706e13f3f2630a879749e7a8b319 # via # -c main.txt + # apscheduler # flask-babel # neo4j # pysaml2 @@ -1973,6 +1978,7 @@ six==1.16.0 \ --hash=sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254 # via # -c main.txt + # apscheduler # bleach # ecdsa # html5lib @@ -2077,6 +2083,7 @@ tzlocal==5.2 \ --hash=sha256:8d399205578f1a9342816409cc1e46a93ebd5755e39ea2d85334bea911bf0e6e # via # -c main.txt + # apscheduler # pyhanko ua-parser==0.18.0 \ --hash=sha256:9d94ac3a80bcb0166823956a779186c746b50ea4c9fd9bf30fdb758553c38950 \ diff --git a/requirements/worker_requirements.in b/requirements/worker_requirements.in index 86ce6b370..0f09ca02b 100644 --- a/requirements/worker_requirements.in +++ b/requirements/worker_requirements.in @@ -1,3 +1,4 @@ -r main.in -c main.txt jinja2 +apscheduler<4 # Next major version has API breaking changes \ No newline at end of file diff --git a/requirements/worker_requirements.txt b/requirements/worker_requirements.txt index 16517b1f8..ed6a40a1b 100644 --- a/requirements/worker_requirements.txt +++ b/requirements/worker_requirements.txt @@ -30,6 +30,10 @@ anyio==4.4.0 \ # via # -c main.txt # httpx +apscheduler==3.10.4 \ + --hash=sha256:e6df071b27d9be898e486bc7940a7be50b4af2e9da7c08f0744a96d4bd4cef4a \ + --hash=sha256:fb91e8a768632a4756a585f79ec834e0e27aad5860bac7eaa523d9ccefd87661 + # via -r worker_requirements.in arabic-reshaper==3.0.0 \ --hash=sha256:3f71d5034bb694204a239a6f1ebcf323ac3c5b059de02259235e2016a1a5e2dc \ --hash=sha256:ffcd13ba5ec007db71c072f5b23f420da92ac7f268512065d49e790e62237099 @@ -1320,6 +1324,7 @@ pytz==2024.1 \ --hash=sha256:328171f4e3623139da4983451950b28e95ac706e13f3f2630a879749e7a8b319 # via # -c main.txt + # apscheduler # neo4j # pysaml2 pyxmlsecurity[PKCS11,pkcs11]==1.0.0 \ @@ -1535,6 +1540,7 @@ six==1.16.0 \ --hash=sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254 # via # -c main.txt + # apscheduler # bleach # ecdsa # html5lib @@ -1619,6 +1625,7 @@ tzlocal==5.2 \ --hash=sha256:8d399205578f1a9342816409cc1e46a93ebd5755e39ea2d85334bea911bf0e6e # via # -c main.txt + # apscheduler # pyhanko uritools==4.0.2 \ --hash=sha256:04df2b787d0eb76200e8319382a03562fbfe4741fd66c15506b08d3b8211d573 \ diff --git a/src/eduid/common/clients/amapi_client/amapi_client.py b/src/eduid/common/clients/amapi_client/amapi_client.py index 6b022b33d..bcf6a1b6f 100644 --- a/src/eduid/common/clients/amapi_client/amapi_client.py +++ b/src/eduid/common/clients/amapi_client/amapi_client.py @@ -20,15 +20,15 @@ class AMAPIClient(GNAPClient): - def __init__(self, amapi_url: str, app, auth_data=GNAPClientAuthData, **kwargs): - super().__init__(auth_data=auth_data, app=app, **kwargs) + def __init__(self, amapi_url: str, auth_data=GNAPClientAuthData, verify_tls: bool = True, **kwargs): + super().__init__(auth_data=auth_data, verify=verify_tls, **kwargs) self.amapi_url = amapi_url def _users_base_url(self) -> str: return urlappend(self.amapi_url, "users") def _put(self, base_path: str, user: str, endpoint: str, body: Any) -> httpx.Response: - return self.put(urlappend(base_path, f"{user}/{endpoint}"), json=body.json()) + return self.put(url=urlappend(base_path, f"{user}/{endpoint}"), content=body.json()) def update_user_email(self, user: str, body: UserUpdateEmailRequest) -> UserUpdateResponse: ret = self._put(base_path=self._users_base_url(), user=user, endpoint="email", body=body) diff --git a/src/eduid/common/clients/gnap_client/base.py b/src/eduid/common/clients/gnap_client/base.py index eb408bdff..b9863fa63 100644 --- a/src/eduid/common/clients/gnap_client/base.py +++ b/src/eduid/common/clients/gnap_client/base.py @@ -36,6 +36,7 @@ class GNAPClientException(Exception): class GNAPClientAuthData(BaseModel): authn_server_url: str + authn_server_verify: bool = True key_name: str client_jwk: ClientJWK access: list[Union[str, Access]] = Field(default_factory=list) diff --git a/src/eduid/common/models/amapi_user.py b/src/eduid/common/models/amapi_user.py index ee7f7e78e..2f8284fe2 100644 --- a/src/eduid/common/models/amapi_user.py +++ b/src/eduid/common/models/amapi_user.py @@ -13,6 +13,7 @@ class Reason(str, Enum): USER_DECEASED = "user_deceased" + USER_DEREGISTERED = "user_deregistered" NAME_CHANGED = "name_changed" CAREGIVER_CHANGED = "caregiver_changed" READ_USER = "read_user" diff --git a/src/eduid/common/rpc/msg_relay.py b/src/eduid/common/rpc/msg_relay.py index 5fd900898..53b1d9ff7 100644 --- a/src/eduid/common/rpc/msg_relay.py +++ b/src/eduid/common/rpc/msg_relay.py @@ -99,9 +99,14 @@ class DeregisteredCauseCode(str, Enum): DECEASED = "AV" EMIGRATED = "UV" OLD_NIN = "GN" - OTHER_REASON = "AN" - TECHNICALLY_DEREGISTERED = "TA" + OLD_COORDINATION_NUMBER = "GS" + # From 2006-09-20 MISSING = "OB" + TECHNICALLY_DEREGISTERED = "TA" + ANNULLED_COORDINATION_NUMBER = "AS" + # Before 2006-09-20 + OTHER_REASON = "AN" + # From 2018-07-01 FALSE_IDENTITY = "FI" diff --git a/src/eduid/common/rpc/tests/test_msg_relay.py b/src/eduid/common/rpc/tests/test_msg_relay.py index 8c1c820fa..48a1c1e48 100644 --- a/src/eduid/common/rpc/tests/test_msg_relay.py +++ b/src/eduid/common/rpc/tests/test_msg_relay.py @@ -7,7 +7,7 @@ from eduid.common.config.base import CeleryConfig, MsgConfigMixin from eduid.common.config.workers import MsgConfig from eduid.common.rpc.exceptions import NoAddressFound, NoNavetData -from eduid.common.rpc.msg_relay import FullPostalAddress, MsgRelay, NavetData, RelationType +from eduid.common.rpc.msg_relay import DeregisteredCauseCode, FullPostalAddress, MsgRelay, NavetData, RelationType from eduid.workers.msg import MsgCelerySingleton from eduid.workers.msg.tasks import MessageSender @@ -39,6 +39,15 @@ def test_get_all_navet_data(self, mock_get_all_navet_data: MagicMock): res = self.msg_relay.get_all_navet_data(nin="190102031234") assert res == NavetData(**self.message_sender.get_devel_all_navet_data()) + @patch("eduid.workers.msg.tasks.get_all_navet_data.apply_async") + def test_get_all_navet_data_deceased(self, mock_get_all_navet_data: MagicMock): + mock_conf = {"get.return_value": self.message_sender.get_devel_all_navet_data(identity_number="189001019802")} + ret = Mock(**mock_conf) + mock_get_all_navet_data.return_value = ret + res = self.msg_relay.get_all_navet_data(nin="189001019802", allow_deregistered=True) + assert res.person.deregistration_information.cause_code == DeregisteredCauseCode.DECEASED + assert res == NavetData(**self.message_sender.get_devel_all_navet_data(identity_number="189001019802")) + @patch("eduid.workers.msg.tasks.get_all_navet_data.apply_async") def test_get_all_navet_data_none_response(self, mock_get_all_navet_data: MagicMock): mock_conf = {"get.return_value": None} diff --git a/src/eduid/dev-extra-modules.txt b/src/eduid/dev-extra-modules.txt new file mode 100755 index 000000000..e69de29bb diff --git a/src/eduid/userdb/user_cleaner/db.py b/src/eduid/userdb/user_cleaner/db.py new file mode 100644 index 000000000..5dcf31339 --- /dev/null +++ b/src/eduid/userdb/user_cleaner/db.py @@ -0,0 +1,51 @@ +import logging +from datetime import datetime +from enum import Enum +from typing import Optional + +import pymongo + +from eduid.userdb.db.base import TUserDbDocument +from eduid.userdb.identity import IdentityType +from eduid.userdb.meta import CleanerType +from eduid.userdb.user import User +from eduid.userdb.userdb import UserDB, UserVar + +logger = logging.getLogger(__name__) + + +class CleanerQueueUser(User): + """ + User version to bookkeep cleaning actions. + eppn + cleaner_type + """ + + cleaner_type: CleanerType + + +class CleanerQueueDB(UserDB[CleanerQueueUser]): + def __init__(self, db_uri: str, db_name: str = "eduid_user_cleaner", collection: str = "cleaner_queue"): + super().__init__(db_uri, db_name, collection) + + indexes = { + "eppn-index-v1": {"key": [("eduPersonPrincipalName", 1)], "unique": True}, + "creation-index-v1": {"key": [("meta.created_ts", 1)], "unique": False}, + } + self.setup_indexes(indexes) + + @classmethod + def user_from_dict(cls, data: TUserDbDocument) -> CleanerQueueUser: + return CleanerQueueUser.from_dict(data) + + def get_next_user(self, cleaner_type: CleanerType) -> Optional[CleanerQueueUser]: + doc = self._coll.find_one_and_delete( + filter={"cleaner_type": cleaner_type}, sort=[("meta.created_ts", pymongo.ASCENDING)] + ) + if doc is not None: + logger.debug("Found document") + user = self.user_from_dict(doc) + return user + else: + logger.debug("No document found") + return None diff --git a/src/eduid/userdb/user_cleaner/userdb.py b/src/eduid/userdb/user_cleaner/userdb.py new file mode 100644 index 000000000..97844efe1 --- /dev/null +++ b/src/eduid/userdb/user_cleaner/userdb.py @@ -0,0 +1,16 @@ +from eduid.userdb.db.base import TUserDbDocument +from eduid.userdb.user import User +from eduid.userdb.userdb import UserDB + + +class CleanerUser(User): + pass + + +class CleanerUserDB(UserDB[CleanerUser]): + def __init__(self, db_uri: str, db_name: str = "eduid_user_cleaner", collection: str = "profiles"): + super().__init__(db_uri, db_name, collection) + + @classmethod + def user_from_dict(cls, data: TUserDbDocument) -> CleanerUser: + return CleanerUser.from_dict(data) diff --git a/src/eduid/userdb/userdb.py b/src/eduid/userdb/userdb.py index d68f96179..a235a866f 100644 --- a/src/eduid/userdb/userdb.py +++ b/src/eduid/userdb/userdb.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from typing import Any, Generic, Mapping, Optional, TypeVar, Union +import pymongo from bson import ObjectId from bson.errors import InvalidId from pymongo import ReturnDocument @@ -19,7 +20,7 @@ UserOutOfSync, ) from eduid.userdb.identity import IdentityType -from eduid.userdb.meta import CleanerType, Meta +from eduid.userdb.meta import Meta from eduid.userdb.user import User logger = logging.getLogger(__name__) @@ -96,26 +97,6 @@ def get_user_by_id(self, user_id: Union[str, ObjectId]) -> Optional[UserVar]: return None return self._get_user_by_attr("_id", user_id) - def _get_users_by_aggregate(self, match: dict[str, Any], sort: dict[str, Any], limit: int) -> list[UserVar]: - users = self._get_documents_by_aggregate(match=match, sort=sort, limit=limit) - return self._users_from_documents(users) - - def get_uncleaned_verified_users( - self, cleaned_type: CleanerType, identity_type: IdentityType, limit: int - ) -> list[UserVar]: - match = { - "identities": { - "$elemMatch": { - "verified": True, - "identity_type": identity_type.value, - } - } - } - - type_filter = f"meta.cleaned.{cleaned_type.value}" - sort = {type_filter: 1} - return self._get_users_by_aggregate(match=match, sort=sort, limit=limit) - def get_verified_users_count(self, identity_type: Optional[IdentityType] = None) -> int: spec: dict[str, Any] spec = { @@ -369,6 +350,20 @@ def save(self, user: User) -> UserSaveResult: return UserSaveResult(success=bool(result)) + def get_unterminated_users_with_nin(self) -> list[User]: + match = { + "identities": { + "$elemMatch": { + "verified": True, + "identity_type": IdentityType.NIN.value, + } + }, + "terminated": {"$exists": False}, + } + + users = self._get_documents_by_aggregate(match=match) + return self._users_from_documents(users) + def unverify_mail_aliases(self, user_id: ObjectId, mail_aliases: Optional[list[dict[str, Any]]]) -> int: count = 0 if mail_aliases is None: diff --git a/src/eduid/workers/am/ams/__init__.py b/src/eduid/workers/am/ams/__init__.py index 5a0bddb2f..33296b710 100644 --- a/src/eduid/workers/am/ams/__init__.py +++ b/src/eduid/workers/am/ams/__init__.py @@ -26,6 +26,7 @@ from eduid.userdb.reset_password import ResetPasswordUserDB from eduid.userdb.security import SecurityUserDB from eduid.userdb.signup import SignupUserDB +from eduid.userdb.user_cleaner.userdb import CleanerUserDB from eduid.workers.am.ams.common import AttributeFetcher logger = get_task_logger(__name__) @@ -310,3 +311,11 @@ class eduid_bankid(AttributeFetcher): @classmethod def get_user_db(cls, uri: str) -> BankIDProofingUserDB: return BankIDProofingUserDB(uri) + + +class eduid_job_runner(AttributeFetcher): + whitelist_set_attrs = ["terminated"] # skv cleaner checks status of registered persons + + @classmethod + def get_user_db(cls, uri: str) -> CleanerUserDB: + return CleanerUserDB(uri) diff --git a/src/eduid/workers/amapi/routers/users.py b/src/eduid/workers/amapi/routers/users.py index 81a42c454..a3dd96f89 100644 --- a/src/eduid/workers/amapi/routers/users.py +++ b/src/eduid/workers/amapi/routers/users.py @@ -57,5 +57,5 @@ async def on_put_meta_cleaned(req: ContextRequest, data: UserUpdateMetaCleanedRe @users_router.put("/{eppn}/terminate", response_model=UserUpdateResponse) async def on_terminate_user(req: ContextRequest, data: UserUpdateTerminateRequest, eppn: str): - req.app.context.logger.info(f"Terminate user {eppn} email") + req.app.context.logger.info(f"Terminate user {eppn}") return update_user(req=req, eppn=eppn, data=data) diff --git a/src/eduid/workers/job_runner/__init__.py b/src/eduid/workers/job_runner/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/eduid/workers/job_runner/app.py b/src/eduid/workers/job_runner/app.py new file mode 100644 index 000000000..07c6e60c3 --- /dev/null +++ b/src/eduid/workers/job_runner/app.py @@ -0,0 +1,45 @@ +from contextlib import asynccontextmanager +from typing import Callable, Optional + +from fastapi import FastAPI + +from eduid.common.config.parsers import load_config +from eduid.workers.job_runner.config import JobRunnerConfig +from eduid.workers.job_runner.context import Context +from eduid.workers.job_runner.scheduler import JobScheduler +from eduid.workers.job_runner.status import status_router + + +class JobRunner(FastAPI): + scheduler: JobScheduler = JobScheduler(timezone="UTC") + + def __init__( + self, name: str = "job_runner", test_config: Optional[dict] = None, lifespan: Optional[Callable] = None + ): + self.config = load_config(typ=JobRunnerConfig, app_name=name, ns="worker", test_config=test_config) + super().__init__(root_path=self.config.application_root, lifespan=lifespan) + + self.context = Context(config=self.config) + self.context.logger.info(f"Starting {name} worker: {self.context.worker_name}") + + +@asynccontextmanager +async def lifespan(app: JobRunner): + app.context.logger.info("Starting scheduler...") + app.scheduler.start() + yield + app.context.logger.info("Stopping scheduler...") + app.scheduler.shutdown() + + +def init_app(name: str = "job_runner", test_config: Optional[dict] = None) -> JobRunner: + app = JobRunner(name, test_config, lifespan=lifespan) + app.context.logger.info(app.config) + + app.include_router(status_router) + + # schedule jobs defined in config + app.scheduler.schedule_jobs(app.context) + + app.context.logger.info("app running...") + return app diff --git a/src/eduid/workers/job_runner/config.py b/src/eduid/workers/job_runner/config.py new file mode 100644 index 000000000..b48164cd3 --- /dev/null +++ b/src/eduid/workers/job_runner/config.py @@ -0,0 +1,67 @@ +import logging +from datetime import datetime, tzinfo +from typing import Any, NewType, Optional, Union + +from pydantic import BaseModel, ConfigDict, field_validator, model_validator + +from eduid.common.clients.gnap_client.base import GNAPClientAuthData +from eduid.common.config.base import AmConfigMixin, LoggingConfigMixin, MsgConfigMixin, RootConfig, StatsConfigMixin +from eduid.common.utils import removesuffix + +logger = logging.getLogger(__name__) + + +class JobCronConfig(BaseModel): + """ + Cron configuration for a single job. + https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html#module-apscheduler.triggers.cron + """ + + model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + + year: Optional[Union[int, str]] = None + month: Optional[Union[int, str]] = None + day: Optional[Union[int, str]] = None + week: Optional[Union[int, str]] = None + day_of_week: Optional[Union[int, str]] = None + hour: Optional[Union[int, str]] = None + minute: Optional[Union[int, str]] = None + second: Optional[Union[int, str]] = None + start_date: Optional[Union[datetime, str]] = None + end_date: Optional[Union[datetime, str]] = None + timezone: Optional[Union[tzinfo, str]] = None + jitter: Optional[int] = None + + @model_validator(mode="before") + @classmethod + def at_least_one_datetime_value(cls, data: Any) -> Any: + if isinstance(data, dict): + need_one_of = ["year", "month", "day", "week", "day_of_week", "hour", "minute", "second"] + assert len(data.keys() & need_one_of), f"At least one of {need_one_of} must be set" + return data + + +EnvironmentOrWorkerName = NewType("EnvironmentOrWorkerName", str) +JobName = NewType("JobName", str) +JobsConfig = NewType("JobsConfig", dict[EnvironmentOrWorkerName, dict[JobName, JobCronConfig]]) + + +class JobRunnerConfig(RootConfig, LoggingConfigMixin, StatsConfigMixin, MsgConfigMixin, AmConfigMixin): + """ + Configuration for the user-cleaner service. + """ + + application_root: str = "" + log_format: str = "{asctime} | {levelname:7} | {hostname} | {name:35} | {module:10} | {message}" + mongo_uri: str = "" + status_cache_seconds: int = 10 + jobs: Optional[JobsConfig] = None + gnap_auth_data: GNAPClientAuthData + + @field_validator("application_root") + @classmethod + def application_root_must_not_end_with_slash(cls, v: str): + if v.endswith("/"): + logger.warning(f"application_root should not end with slash ({v})") + v = removesuffix(v, "/") + return v diff --git a/src/eduid/workers/job_runner/context.py b/src/eduid/workers/job_runner/context.py new file mode 100644 index 000000000..babf359ac --- /dev/null +++ b/src/eduid/workers/job_runner/context.py @@ -0,0 +1,45 @@ +import logging +from os import environ + +from eduid.common.clients.amapi_client.amapi_client import AMAPIClient +from eduid.common.fastapi.log import init_logging +from eduid.common.rpc.am_relay import AmRelay +from eduid.common.rpc.msg_relay import MsgRelay +from eduid.userdb.user_cleaner.db import CleanerQueueDB +from eduid.userdb.user_cleaner.userdb import CleanerUserDB +from eduid.userdb.userdb import AmDB +from eduid.workers.job_runner.config import JobRunnerConfig + + +class Context: + def __init__(self, config: JobRunnerConfig): + self.name = config.app_name + self.config = config + + worker_name = environ.get("WORKER_NAME", None) + if worker_name is None: + raise RuntimeError("Environment variable WORKER_NAME needs to be set") + self.worker_name = worker_name + + # Setup logging + init_logging(self.name, self.config) + self.logger = logging.getLogger("user_cleaner") + self.logger.info("Logging initialized") + + # Setup databases + self.db = AmDB(db_uri=self.config.mongo_uri) + self.logger.info(f"Database {self.db} initialized") + + self.cleaner_queue = CleanerQueueDB(db_uri=self.config.mongo_uri) + self.logger.info(f"Database {self.cleaner_queue} initialized") + + self.private_db = CleanerUserDB(db_uri=self.config.mongo_uri) + self.logger.info(f"Database {self.private_db} initialized") + + # Setup MsgRelay + self.msg_relay = MsgRelay(self.config) + self.logger.info(f"MsgRelay {self.msg_relay} initialized") + + # Setup AmRelay + self.am_relay = AmRelay(self.config) + self.logger.info(f"AmRelay {self.am_relay} initialized") diff --git a/src/eduid/workers/job_runner/helpers.py b/src/eduid/workers/job_runner/helpers.py new file mode 100644 index 000000000..81168c52a --- /dev/null +++ b/src/eduid/workers/job_runner/helpers.py @@ -0,0 +1,22 @@ +from eduid.userdb.user import User +from eduid.userdb.user_cleaner.userdb import CleanerUser +from eduid.workers.job_runner.context import Context + + +def save_and_sync_user(context: Context, user: User) -> bool: + """ + Save to private userdb and propagate change to central user db. + + May raise UserOutOfSync exception + + :param user: the modified user + """ + private_user = CleanerUser.from_user(user, context.private_db) + context.private_db.save(private_user) + context.logger.debug( + f"Saving user {private_user} to private userdb {context.private_db} (is_in_database: {user.meta.is_in_database})" + ) + + # Sync to central userdb + context.logger.debug(f"Syncing {user} to central userdb") + return context.am_relay.request_user_sync(user) diff --git a/src/eduid/workers/job_runner/jobs/skv.py b/src/eduid/workers/job_runner/jobs/skv.py new file mode 100644 index 000000000..cf95e9252 --- /dev/null +++ b/src/eduid/workers/job_runner/jobs/skv.py @@ -0,0 +1,68 @@ +from eduid.common.misc.timeutil import utc_now +from eduid.common.rpc.exceptions import MsgTaskFailed +from eduid.common.rpc.msg_relay import DeregisteredCauseCode, NavetData +from eduid.userdb.exceptions import UserDoesNotExist +from eduid.userdb.meta import CleanerType +from eduid.userdb.user import User +from eduid.userdb.user_cleaner.db import CleanerQueueUser +from eduid.workers.job_runner.context import Context +from eduid.workers.job_runner.helpers import save_and_sync_user + + +def gather_skv_users(context: Context): + """ " + Gather and queue all users that should be checked against SKV API:s + + """ + context.logger.debug("gathering users to check") + users: list[User] = context.db.get_unterminated_users_with_nin() + context.logger.debug(f"gathered {len(users)} users to check") + for user in users: + try: + context.cleaner_queue.get_user_by_eppn(user.eppn) + context.logger.debug(f"{user.eppn} already in queue") + except UserDoesNotExist: + queue_user: CleanerQueueUser = CleanerQueueUser( + eppn=user.eppn, cleaner_type=CleanerType.SKV, identities=user.identities + ) + context.cleaner_queue.save(queue_user) + + +def check_skv_users(context: Context): + """ + Check all users that should be checked against SKV API:s + """ + context.logger.debug("checking users") + user = context.cleaner_queue.get_next_user(CleanerType.SKV) + if user is not None: + context.logger.debug(f"Checking if user with eppn {user.eppn} should be terminated") + assert user.identities.nin is not None # Please mypy + try: + navet_data: NavetData = context.msg_relay.get_all_navet_data( + nin=user.identities.nin.number, allow_deregistered=True + ) + context.logger.debug(f"Navet data: {navet_data}") + + if navet_data.person.is_deregistered(): + cause = navet_data.person.deregistration_information.cause_code + if cause is DeregisteredCauseCode.EMIGRATED: + context.logger.debug(f"User with eppn {user.eppn} has emigrated and should not be terminated") + else: + context.logger.debug(f"User with eppn {user.eppn} should be terminated") + terminate_user(context, user) + else: + context.logger.debug(f"User with eppn {user.eppn} is still registered") + except MsgTaskFailed: + context.logger.error(f"Failed to get Navet data for user with eppn {user.eppn}") + # The user will be requeued for a new check by the next run of gather_skv_users + else: + context.logger.debug("Nothing to do") + + +def terminate_user(context: Context, queue_user: CleanerQueueUser): + """ + Terminate a user + """ + user = context.db.get_user_by_eppn(queue_user.eppn) + user.terminated = utc_now() + save_and_sync_user(context, user) diff --git a/src/eduid/workers/job_runner/run.py b/src/eduid/workers/job_runner/run.py new file mode 100644 index 000000000..d5c2a6980 --- /dev/null +++ b/src/eduid/workers/job_runner/run.py @@ -0,0 +1,10 @@ +import os +from sys import stderr + +from eduid.workers.job_runner.app import init_app + +DEBUG = os.environ.get("EDUID_APP_DEBUG", False) +if DEBUG: + stderr.writelines("----- WARNING! EDUID_APP_DEBUG is enabled -----\n") + +app = init_app() diff --git a/src/eduid/workers/job_runner/scheduler.py b/src/eduid/workers/job_runner/scheduler.py new file mode 100644 index 000000000..44d1e1ea3 --- /dev/null +++ b/src/eduid/workers/job_runner/scheduler.py @@ -0,0 +1,57 @@ +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +from eduid.common.config.exceptions import BadConfiguration +from eduid.workers.job_runner.config import EnvironmentOrWorkerName, JobCronConfig +from eduid.workers.job_runner.context import Context +from eduid.workers.job_runner.jobs.skv import check_skv_users, gather_skv_users + + +class JobScheduler(AsyncIOScheduler): + + def schedule_jobs(self, context: Context): + """ + Schedule all jobs configured for host or environment + """ + + environment = EnvironmentOrWorkerName(context.config.environment) + worker_name = EnvironmentOrWorkerName(context.worker_name) + + if context.config.jobs is None: + context.logger.info("No jobs configured in config") + return + + jobs_config = context.config.jobs + context.logger.debug(f"jobs_config: {jobs_config}") + + jobs: dict = {} + + # Gather jobs for current environment and worker in a dictionary + if environment in jobs_config: + context.logger.debug(f"Setting up jobs for environment {environment}") + context.logger.debug(f"Setting up jobs {jobs_config[environment]}") + jobs.update(jobs_config[environment]) + + if worker_name in jobs_config: + context.logger.debug(f"Setting up jobs for worker {worker_name}") + context.logger.debug(f"Setting up jobs {jobs_config[worker_name]}") + jobs.update(jobs_config[worker_name]) + + if len(jobs) == 0: + context.logger.info(f"No jobs configured for {worker_name} running {environment}") + return + + context.logger.info(f"Setting up jobs {jobs} for {worker_name} running {environment}") + + # Add all configured jobs to the scheduler + for job in jobs: + cron_settings: JobCronConfig = jobs[job] + params = cron_settings.model_dump() + context.logger.info(f"Setting up job {job} with parameters {params}") + + match job: + case "gather_skv_users": + self.add_job(gather_skv_users, "cron", **params, args=(context,)) + case "check_skv_users": + self.add_job(check_skv_users, "cron", **params, args=(context,)) + case _: + raise BadConfiguration("unknown job in config") diff --git a/src/eduid/workers/job_runner/status.py b/src/eduid/workers/job_runner/status.py new file mode 100644 index 000000000..1249ae7e9 --- /dev/null +++ b/src/eduid/workers/job_runner/status.py @@ -0,0 +1,62 @@ +from os import environ +from typing import Mapping + +from fastapi import APIRouter, Request, Response +from pydantic import BaseModel + +from eduid.common.fastapi.context_request import ContextRequest, ContextRequestRoute +from eduid.common.fastapi.utils import ( + check_restart, + get_cached_response, + log_failure_info, + reset_failure_info, + set_cached_response, +) + +status_router = APIRouter(route_class=ContextRequestRoute, prefix="/status") + + +class StatusResponse(BaseModel): + status: str + hostname: str + reason: str + + +def check_mongo(request: ContextRequest): + db = request.app.context.db + try: + db.is_healthy() + reset_failure_info(request, "_check_mongo") + return True + except Exception as exc: + log_failure_info(request, "_check_mongo", msg="Mongodb health check failed", exc=exc) + check_restart("_check_mongo", restart=0, terminate=120) + return False + + +def check_scheduler(request: ContextRequest): + scheduler = request.app.scheduler + return scheduler.running + + +@status_router.get("/healthy", response_model=StatusResponse) +async def healthy(request: ContextRequest, response: Response) -> Mapping: + status = get_cached_response(request, response, key="health_check") + if not status: + status = { + "status": f"STATUS_FAIL_{request.app.context.name}_", + "hostname": environ.get("HOSTNAME", "UNKNOWN"), + } + reasons = [] + if not check_mongo(request): + reasons.append("mongodb check failed") + request.app.context.logger.warning("MongoDB health check failed") + elif not check_scheduler(request): + reasons.append("scheduler check failed") + request.app.context.logger.warning("APScheduler health check failed") + else: + status["status"] = f"STATUS_OK_{request.app.context.name}_" + reasons.append("mongodb check succeeded") + status["reason"] = ", ".join(reasons) + set_cached_response(request, response, key="health_check", data=status) + return status diff --git a/src/eduid/workers/job_runner/testing.py b/src/eduid/workers/job_runner/testing.py new file mode 100644 index 000000000..818898256 --- /dev/null +++ b/src/eduid/workers/job_runner/testing.py @@ -0,0 +1,49 @@ +import os +import unittest +from typing import Any + +import pkg_resources +from jwcrypto.jwk import JWK + +from eduid.common.config.parsers import load_config +from eduid.userdb.testing import MongoTemporaryInstance +from eduid.userdb.user_cleaner.db import CleanerQueueDB +from eduid.workers.job_runner.config import JobRunnerConfig +from eduid.workers.job_runner.context import Context + + +class BaseDBTestCase(unittest.TestCase): + """ + Base test case that sets up a temporary database for testing. + """ + + mongodb_instance: MongoTemporaryInstance + mongo_uri: str + + @classmethod + def setUpClass(cls): + cls.mongodb_instance = MongoTemporaryInstance.get_instance() + cls.mongo_uri = cls.mongodb_instance.uri + + +class CleanerQueueTestCase(BaseDBTestCase): + """ + Base class for tests of the cleaner queue. + """ + + @classmethod + def setUpClass(cls) -> None: + return super().setUpClass() + + def setUp(self) -> None: + if "EDUID_CONFIG_YAML" not in os.environ: + os.environ["EDUID_CONFIG_YAML"] = "YAML_CONFIG_NOT_USED" + + self.datadir = pkg_resources.resource_filename(__name__, "tests/data") + + self.cleaner_queue_db = CleanerQueueDB(db_uri=self.mongo_uri) + + def tearDown(self) -> None: + super().tearDown() + if self.cleaner_queue_db: + self.cleaner_queue_db._drop_whole_collection() diff --git a/src/eduid/workers/job_runner/tests/test_user_cleaner.py b/src/eduid/workers/job_runner/tests/test_user_cleaner.py new file mode 100644 index 000000000..7cb4abf4b --- /dev/null +++ b/src/eduid/workers/job_runner/tests/test_user_cleaner.py @@ -0,0 +1,51 @@ +import time + +from eduid.userdb.fixtures.users import UserFixtures +from eduid.userdb.meta import CleanerType +from eduid.userdb.user_cleaner.db import CleanerQueueUser +from eduid.workers.job_runner.testing import CleanerQueueTestCase + + +class TestCleanerQueueDB(CleanerQueueTestCase): + users = UserFixtures() + + def setUp(self): + super().setUp() + + def test_queue_order(self): + first = self.users.mocked_user_standard + second = self.users.mocked_user_standard_2 + first_user: CleanerQueueUser = CleanerQueueUser( + eppn=first.eppn, cleaner_type=CleanerType.SKV, identities=first.identities + ) + self.cleaner_queue_db.save(first_user) + second_user: CleanerQueueUser = CleanerQueueUser( + eppn=second.eppn, cleaner_type=CleanerType.SKV, identities=second.identities + ) + self.cleaner_queue_db.save(second_user) + + first_user_from_db = self.cleaner_queue_db.get_next_user(CleanerType.SKV) + second_user_from_db = self.cleaner_queue_db.get_next_user(CleanerType.SKV) + + self.assertEqual(first_user_from_db.eppn, first.eppn) + self.assertEqual(second_user_from_db.eppn, second_user.eppn) + + def test_mixed_queue(self): + first = self.users.mocked_user_standard + second = self.users.mocked_user_standard_2 + ladok_queue_user: CleanerQueueUser = CleanerQueueUser( + eppn=first.eppn, cleaner_type=CleanerType.LADOK, identities=first.identities + ) + self.cleaner_queue_db.save(ladok_queue_user) + skv_queue_user: CleanerQueueUser = CleanerQueueUser( + eppn=second.eppn, cleaner_type=CleanerType.SKV, identities=second.identities + ) + self.cleaner_queue_db.save(skv_queue_user) + + first_user_from_db = self.cleaner_queue_db.get_next_user(CleanerType.SKV) + second_user_from_db = self.cleaner_queue_db.get_next_user(CleanerType.SKV) + third_user_from_db = self.cleaner_queue_db.get_next_user(CleanerType.LADOK) + + self.assertEqual(first_user_from_db.eppn, skv_queue_user.eppn) + self.assertIsNone(second_user_from_db) + self.assertEqual(third_user_from_db.eppn, ladok_queue_user.eppn) diff --git a/src/eduid/workers/msg/tasks.py b/src/eduid/workers/msg/tasks.py index 7571bcf5f..f83c4dec9 100644 --- a/src/eduid/workers/msg/tasks.py +++ b/src/eduid/workers/msg/tasks.py @@ -234,23 +234,30 @@ def get_devel_relations() -> OrderedDict[str, Any]: def get_all_navet_data(self, identity_number: str) -> Optional[OrderedDict[str, Any]]: # Only log the message if devel_mode is enabled if MsgCelerySingleton.worker_config.devel_mode is True: - return self.get_devel_all_navet_data() + return self.get_devel_all_navet_data(identity_number) data = self._get_navet_data(identity_number) return navet_get_all_data(data) @staticmethod - def get_devel_all_navet_data() -> OrderedDict[str, Any]: + def get_devel_all_navet_data(identity_number: str = "190102031234") -> OrderedDict[str, Any]: """ Return a dict with devel data + Birthdates preceding 1900 are shown as deceased for testing purposes """ + + deregistration_information = {} + birth_year = int(identity_number[0:4]) + if birth_year < 1900: + deregistration_information = {"date": "20220315", "causeCode": "AV"} + result = OrderedDict( { "CaseInformation": {"lastChanged": "20170904141659"}, "Person": { - "PersonId": {"NationalIdentityNumber": "197609272393"}, + "PersonId": {"NationalIdentityNumber": identity_number}, "ReferenceNationalIdentityNumber": "", - "DeregistrationInformation": {}, + "DeregistrationInformation": deregistration_information, "Name": {"GivenNameMarking": "20", "GivenName": "Testaren Test", "Surname": "Testsson"}, "PostalAddresses": { "OfficialAddress": {"Address2": "ÖRGATAN 79 LGH 10", "PostalCode": "12345", "City": "LANDET"}