From 5a92bc8f2847f1ba3d96f934759601e3b5c5fc96 Mon Sep 17 00:00:00 2001 From: Liam Keegan Date: Tue, 22 Oct 2024 10:26:39 +0200 Subject: [PATCH] Store job information - add Job model and /admin/jobs endpoint - start/end times, status and error messages are now stored for each job - admins can view the list of jobs - resolves #20 --- backend/src/predicTCR_server/app.py | 39 +++++++- backend/src/predicTCR_server/model.py | 109 ++++++++++++++-------- backend/tests/helpers/flask_test_utils.py | 2 + backend/tests/test_app.py | 59 +++++++++--- frontend/src/components/JobsTable.vue | 62 ++++++++++++ frontend/src/utils/types.ts | 9 ++ frontend/src/views/AdminView.vue | 4 + runner/docker-compose.yml | 2 + runner/src/predicTCR_runner/runner.py | 55 ++++++----- runner/tests/test_runner.py | 8 +- 10 files changed, 267 insertions(+), 82 deletions(-) create mode 100644 frontend/src/components/JobsTable.vue diff --git a/backend/src/predicTCR_server/app.py b/backend/src/predicTCR_server/app.py index 07df9d4..0822a1e 100644 --- a/backend/src/predicTCR_server/app.py +++ b/backend/src/predicTCR_server/app.py @@ -13,10 +13,13 @@ from flask_jwt_extended import JWTManager from flask_cors import cross_origin from predicTCR_server.logger import get_logger +from predicTCR_server.utils import timestamp_now from predicTCR_server.model import ( db, Sample, User, + Job, + Status, Settings, add_new_user, add_new_runner_user, @@ -281,6 +284,16 @@ def admin_users(): ) return jsonify(users=[user.as_dict() for user in users]) + @app.route("/api/admin/jobs", methods=["GET"]) + @jwt_required() + def admin_jobs(): + if not current_user.is_admin: + return jsonify(message="Admin account required"), 400 + jobs = ( + db.session.execute(db.select(Job).order_by(db.desc(Job.id))).scalars().all() + ) + return jsonify(jobs=[job.as_dict() for job in jobs]) + @app.route("/api/admin/runner_token", methods=["GET"]) @jwt_required() def admin_runner_token(): @@ -305,7 +318,17 @@ def runner_request_job(): sample_id = request_job() if sample_id is None: return jsonify(message="No job available"), 204 - return {"sample_id": sample_id} + new_job = Job( + id=None, + sample_id=sample_id, + timestamp_start=timestamp_now(), + timestamp_end=0, + status=Status.RUNNING, + error_message="", + ) + db.session.add(new_job) + db.session.commit() + return {"job_id": new_job.id, "sample_id": sample_id} @app.route("/api/runner/result", methods=["POST"]) @cross_origin() @@ -317,6 +340,9 @@ def runner_result(): sample_id = form_as_dict.get("sample_id", None) if sample_id is None: return jsonify(message="Missing key: sample_id"), 400 + job_id = form_as_dict.get("job_id", None) + if job_id is None: + return jsonify(message="Missing key: job_id"), 400 success = form_as_dict.get("success", None) if success is None or success.lower() not in ["true", "false"]: logger.info(" -> missing success key") @@ -328,12 +354,14 @@ def runner_result(): return jsonify(message="Result has success=True but no file"), 400 runner_hostname = form_as_dict.get("runner_hostname", "") logger.info( - f"Result upload for '{sample_id}' from runner {current_user.email} / {runner_hostname}" + f"Job '{job_id}' uploaded result for '{sample_id}' from runner {current_user.email} / {runner_hostname}" ) - error_message = form_as_dict.get("error_message", None) - if error_message is not None: + error_message = form_as_dict.get("error_message", "") + if error_message != "": logger.info(f" -> error message: {error_message}") - message, code = process_result(sample_id, success, zipfile) + message, code = process_result( + int(job_id), int(sample_id), success, error_message, zipfile + ) return jsonify(message=message), code with app.app_context(): @@ -341,6 +369,7 @@ def runner_result(): if db.session.get(Settings, 1) is None: db.session.add( Settings( + id=None, default_personal_submission_quota=10, default_personal_submission_interval_mins=30, global_quota=1000, diff --git a/backend/src/predicTCR_server/model.py b/backend/src/predicTCR_server/model.py index 827d44c..258868c 100644 --- a/backend/src/predicTCR_server/model.py +++ b/backend/src/predicTCR_server/model.py @@ -2,12 +2,14 @@ import re import flask -from enum import Enum +import enum import argon2 import pathlib from flask_sqlalchemy import SQLAlchemy +from sqlalchemy.orm import DeclarativeBase, MappedAsDataclass, Mapped, mapped_column from werkzeug.datastructures import FileStorage from sqlalchemy.inspection import inspect +from sqlalchemy import Integer, String, Boolean, Enum from dataclasses import dataclass from predicTCR_server.email import send_email from predicTCR_server.settings import predicTCR_url @@ -20,12 +22,17 @@ decode_password_reset_token, ) -db = SQLAlchemy() + +class Base(DeclarativeBase, MappedAsDataclass): + pass + + +db = SQLAlchemy(model_class=Base) ph = argon2.PasswordHasher() logger = get_logger() -class Status(str, Enum): +class Status(str, enum.Enum): QUEUED = "queued" RUNNING = "running" COMPLETED = "completed" @@ -34,34 +41,45 @@ class Status(str, Enum): @dataclass class Settings(db.Model): - id: int = db.Column(db.Integer, primary_key=True) - default_personal_submission_quota: int = db.Column(db.Integer, nullable=False) - default_personal_submission_interval_mins: int = db.Column( - db.Integer, nullable=False + id: Mapped[int] = mapped_column(Integer, primary_key=True) + default_personal_submission_quota: Mapped[int] = mapped_column( + Integer, nullable=False + ) + default_personal_submission_interval_mins: Mapped[int] = mapped_column( + Integer, nullable=False ) - global_quota: int = db.Column(db.Integer, nullable=False) - tumor_types: str = db.Column(db.String, nullable=False) - sources: str = db.Column(db.String, nullable=False) - csv_required_columns: str = db.Column(db.String, nullable=False) + global_quota: Mapped[int] = mapped_column(Integer, nullable=False) + tumor_types: Mapped[str] = mapped_column(String, nullable=False) + sources: Mapped[str] = mapped_column(String, nullable=False) + csv_required_columns: Mapped[str] = mapped_column(String, nullable=False) def as_dict(self): - return { - c: getattr(self, c) - for c in inspect(self).attrs.keys() - if c != "password_hash" - } + return {c: getattr(self, c) for c in inspect(self).attrs.keys()} + + +@dataclass +class Job(db.Model): + id: Mapped[int] = mapped_column(Integer, primary_key=True) + sample_id: Mapped[int] = mapped_column(Integer, nullable=False) + timestamp_start: Mapped[int] = mapped_column(Integer, nullable=False) + timestamp_end: Mapped[int] = mapped_column(Integer, nullable=False) + status: Mapped[Status] = mapped_column(Enum(Status), nullable=False) + error_message: Mapped[str] = mapped_column(String, nullable=False) + + def as_dict(self): + return {c: getattr(self, c) for c in inspect(self).attrs.keys()} @dataclass class Sample(db.Model): - id: int = db.Column(db.Integer, primary_key=True) - email: str = db.Column(db.String(256), nullable=False) - name: str = db.Column(db.String(128), nullable=False) - tumor_type: str = db.Column(db.String(128), nullable=False) - source: str = db.Column(db.String(128), nullable=False) - timestamp: int = db.Column(db.Integer, nullable=False) - status: Status = db.Column(db.Enum(Status), nullable=False) - has_results_zip: bool = db.Column(db.Boolean, nullable=False) + id: Mapped[int] = mapped_column(Integer, primary_key=True) + email: Mapped[str] = mapped_column(String(256), nullable=False) + name: Mapped[str] = mapped_column(String(128), nullable=False) + tumor_type: Mapped[str] = mapped_column(String(128), nullable=False) + source: Mapped[str] = mapped_column(String(128), nullable=False) + timestamp: Mapped[int] = mapped_column(Integer, nullable=False) + status: Mapped[Status] = mapped_column(Enum(Status), nullable=False) + has_results_zip: Mapped[bool] = mapped_column(Boolean, nullable=False) def _base_path(self) -> pathlib.Path: data_path = flask.current_app.config["PREDICTCR_DATA_PATH"] @@ -79,17 +97,17 @@ def result_file_path(self) -> pathlib.Path: @dataclass class User(db.Model): - id: int = db.Column(db.Integer, primary_key=True) - email: str = db.Column(db.Text, nullable=False, unique=True) - password_hash: str = db.Column(db.Text, nullable=False) - activated: bool = db.Column(db.Boolean, nullable=False) - enabled: bool = db.Column(db.Boolean, nullable=False) - quota: int = db.Column(db.Integer, nullable=False) - submission_interval_minutes: int = db.Column(db.Integer, nullable=False) - last_submission_timestamp: int = db.Column(db.Integer, nullable=False) - is_admin: bool = db.Column(db.Boolean, nullable=False) - is_runner: bool = db.Column(db.Boolean, nullable=False) - full_results: bool = db.Column(db.Boolean, nullable=False) + id: int = mapped_column(Integer, primary_key=True) + email: str = mapped_column(String, nullable=False, unique=True) + password_hash: str = mapped_column(String, nullable=False) + activated: bool = mapped_column(Boolean, nullable=False) + enabled: bool = mapped_column(Boolean, nullable=False) + quota: int = mapped_column(Integer, nullable=False) + submission_interval_minutes: int = mapped_column(Integer, nullable=False) + last_submission_timestamp: int = mapped_column(Integer, nullable=False) + is_admin: bool = mapped_column(Boolean, nullable=False) + is_runner: bool = mapped_column(Boolean, nullable=False) + full_results: bool = mapped_column(Boolean, nullable=False) def set_password_nocheck(self, new_password: str): self.password_hash = ph.hash(new_password) @@ -145,17 +163,26 @@ def request_job() -> int | None: def process_result( - sample_id: str, success: bool, result_zip_file: FileStorage | None + job_id: int, + sample_id: int, + success: bool, + error_message: str, + result_zip_file: FileStorage | None, ) -> tuple[str, int]: - sample = db.session.execute( - db.select(Sample).filter_by(id=sample_id) - ).scalar_one_or_none() + sample = db.session.get(Sample, sample_id) if sample is None: logger.warning(f" --> Unknown sample id {sample_id}") return f"Unknown sample id {sample_id}", 400 + job = db.session.get(Job, job_id) + if job is None: + logger.warning(f" --> Unknown job id {job_id}") + return f"Unknown job id {job_id}", 400 + job.timestamp_end = timestamp_now() if success is False: sample.has_results_zip = False sample.status = Status.FAILED + job.status = Status.FAILED + job.error_message = error_message db.session.commit() return "Result processed", 200 if result_zip_file is None: @@ -165,6 +192,7 @@ def process_result( result_zip_file.save(sample.result_file_path()) sample.has_results_zip = True sample.status = Status.COMPLETED + job.status = Status.COMPLETED db.session.commit() return "Result processed", 200 @@ -244,6 +272,7 @@ def add_new_user(email: str, password: str, is_admin: bool) -> tuple[str, int]: try: db.session.add( User( + id=None, email=email, password_hash=ph.hash(password), activated=False, @@ -282,6 +311,7 @@ def add_new_runner_user() -> User | None: runner_name = f"runner{runner_number}" db.session.add( User( + id=None, email=runner_name, password_hash="", activated=False, @@ -419,6 +449,7 @@ def add_new_sample( settings = db.session.get(Settings, 1) settings.global_quota -= 1 new_sample = Sample( + id=None, email=email, name=name, tumor_type=tumor_type, diff --git a/backend/tests/helpers/flask_test_utils.py b/backend/tests/helpers/flask_test_utils.py index 405cb5d..a79e1b9 100644 --- a/backend/tests/helpers/flask_test_utils.py +++ b/backend/tests/helpers/flask_test_utils.py @@ -14,6 +14,7 @@ def add_test_users(app): email = f"{name}@abc.xy" db.session.add( User( + id=None, email=email, password_hash=ph.hash(name), activated=True, @@ -46,6 +47,7 @@ def add_test_samples(app, data_path: pathlib.Path): with open(f"{ref_dir}/input.{input_file_type}", "w") as f: f.write(input_file_type) new_sample = Sample( + id=None, email="user@abc.xy", name=name, tumor_type=f"tumor_type{sample_id}", diff --git a/backend/tests/test_app.py b/backend/tests/test_app.py index dfddbfd..bc6edc9 100644 --- a/backend/tests/test_app.py +++ b/backend/tests/test_app.py @@ -210,12 +210,13 @@ def test_result_invalid(client): assert "No results available" in response.json["message"] -def _upload_result(client, result_zipfile: pathlib.Path, sample_id: int): +def _upload_result(client, result_zipfile: pathlib.Path, job_id: int, sample_id: int): headers = _get_auth_headers(client, "runner@abc.xy", "runner") with open(result_zipfile, "rb") as f: response = client.post( "/api/runner/result", data={ + "job_id": job_id, "sample_id": sample_id, "success": True, "file": (io.BytesIO(f.read()), result_zipfile.name), @@ -225,19 +226,57 @@ def _upload_result(client, result_zipfile: pathlib.Path, sample_id: int): return response -def test_result_valid(client, result_zipfile): - headers = _get_auth_headers(client, "user@abc.xy", "user") - sample_id = 1 - assert _upload_result(client, result_zipfile, sample_id).status_code == 200 +def test_runner_valid_success(client, result_zipfile): + headers = _get_auth_headers(client, "runner@abc.xy", "runner") + # request job + request_job_response = client.post( + "/api/runner/request_job", + json={"runner_hostname": "me"}, + headers=headers, + ) + assert request_job_response.status_code == 200 + assert request_job_response.json == {"sample_id": 1, "job_id": 1} + # upload successful result + assert _upload_result(client, result_zipfile, 1, 1).status_code == 200 response = client.post( "/api/result", - json={"sample_id": sample_id}, - headers=headers, + json={"sample_id": 1}, + headers=_get_auth_headers(client, "user@abc.xy", "user"), ) assert response.status_code == 200 assert len(response.data) > 1 +def test_runner_valid_failure(client, result_zipfile): + headers = _get_auth_headers(client, "runner@abc.xy", "runner") + # request job + request_job_response = client.post( + "/api/runner/request_job", + json={"runner_hostname": "me"}, + headers=headers, + ) + assert request_job_response.status_code == 200 + assert request_job_response.json == {"sample_id": 1, "job_id": 1} + # upload failure result + result_response = client.post( + "/api/runner/result", + data={ + "job_id": 1, + "sample_id": 1, + "success": False, + "error_message": "Something went wrong", + }, + headers=headers, + ) + assert result_response.status_code == 200 + response = client.post( + "/api/result", + json={"sample_id": 1}, + headers=_get_auth_headers(client, "user@abc.xy", "user"), + ) + assert response.status_code == 400 + + def test_admin_samples_valid(client): headers = _get_auth_headers(client, "admin@abc.xy", "admin") response = client.get("/api/admin/samples", headers=headers) @@ -288,12 +327,6 @@ def test_admin_users_valid(client): assert "users" in response.json -def test_runner_result_valid(client, result_zipfile): - response = _upload_result(client, result_zipfile, 1) - assert response.status_code == 200 - assert "result processed" in response.json["message"].lower() - - def test_admin_update_user_valid(client): headers = _get_auth_headers(client, "admin@abc.xy", "admin") user = client.get("/api/admin/users", headers=headers).json["users"][0] diff --git a/frontend/src/components/JobsTable.vue b/frontend/src/components/JobsTable.vue new file mode 100644 index 0000000..063fea7 --- /dev/null +++ b/frontend/src/components/JobsTable.vue @@ -0,0 +1,62 @@ + + + diff --git a/frontend/src/utils/types.ts b/frontend/src/utils/types.ts index 6dab111..9a61ff2 100644 --- a/frontend/src/utils/types.ts +++ b/frontend/src/utils/types.ts @@ -31,3 +31,12 @@ export type Settings = { sources: string; csv_required_columns: string; }; + +export type Job = { + id: number; + sample_id: number; + timestamp_start: number; + timestamp_end: number; + status: string; + error_message: string; +}; diff --git a/frontend/src/views/AdminView.vue b/frontend/src/views/AdminView.vue index 8ae35c4..57455e2 100644 --- a/frontend/src/views/AdminView.vue +++ b/frontend/src/views/AdminView.vue @@ -3,6 +3,7 @@ import SamplesTable from "@/components/SamplesTable.vue"; import SettingsTable from "@/components/SettingsTable.vue"; import UsersTable from "@/components/UsersTable.vue"; import ListComponent from "@/components/ListComponent.vue"; +import JobsTable from "@/components/JobsTable.vue"; import ListItem from "@/components/ListItem.vue"; import { FwbButton } from "flowbite-vue"; import { ref } from "vue"; @@ -72,6 +73,9 @@ get_samples(); + + + diff --git a/runner/docker-compose.yml b/runner/docker-compose.yml index 2a8778b..1d3f060 100644 --- a/runner/docker-compose.yml +++ b/runner/docker-compose.yml @@ -10,9 +10,11 @@ services: deploy: mode: replicated replicas: ${PREDICTCR_RUNNER_JOBS:-1} + restart: always networks: - predictcr-network networks: predictcr-network: name: predictcr + external: true diff --git a/runner/src/predicTCR_runner/runner.py b/runner/src/predicTCR_runner/runner.py index 806b6f7..3bebc26 100644 --- a/runner/src/predicTCR_runner/runner.py +++ b/runner/src/predicTCR_runner/runner.py @@ -16,9 +16,13 @@ def __init__(self, api_url: str, jwt_token: str, poll_interval: int = 5): self.poll_interval = poll_interval self.runner_hostname = os.environ.get("HOSTNAME", "unknown") self.logger = logging.getLogger(__name__) + self.job_id: int | None = None + self.sample_id: int | None = None - def _request_job(self) -> int | None: + def _request_job(self) -> bool: self.logger.debug(f"Requesting job from {self.api_url}...") + self.job_id = None + self.sample_id = None response = requests.post( url=f"{self.api_url}/runner/request_job", json={"runner_hostname": self.runner_hostname}, @@ -27,23 +31,28 @@ def _request_job(self) -> int | None: ) if response.status_code == 204: self.logger.debug(" -> no job available.") - return None + return False elif response.status_code == 200: - sample_id = response.json().get("sample_id", None) - self.logger.debug(f" -> sample id {sample_id} available.") - return sample_id + self.job_id = response.json().get("job_id", None) + self.sample_id = response.json().get("sample_id", None) + self.logger.debug( + f" -> job id {self.job_id} for sample id {self.sample_id}." + ) + if self.job_id is not None and self.sample_id is not None: + return True else: self.logger.error( f"request_job failed with {response.status_code}: {response.content}" ) - return None + return False - def _report_job_failed(self, sample_id: int, message: str): - self.logger.info(f"...job failed for sample id {sample_id}.") + def _report_job_failed(self, message: str): + self.logger.info(f"...job {self.job_id} failed for sample id {self.sample_id}.") response = requests.post( url=f"{self.api_url}/runner/result", data={ - "sample_id": sample_id, + "job_id": self.job_id, + "sample_id": self.sample_id, "runner_id": self.runner_hostname, "success": "false", "error_message": message, @@ -54,16 +63,17 @@ def _report_job_failed(self, sample_id: int, message: str): if response.status_code != 200: self.logger.error(f"result with {response.status_code}: {response.content}") - def _upload_result(self, sample_id: int, result_file: str): + def _upload_result(self, result_file: str): self.logger.info( - f"...job finished for sample id {sample_id}, uploading {result_file}..." + f"...job {self.job_id} finished for sample id {self.sample_id}, uploading {result_file}..." ) with open(result_file) as result_file: response = requests.post( url=f"{self.api_url}/runner/result", files={"file": result_file}, data={ - "sample_id": sample_id, + "job_id": self.job_id, + "sample_id": self.sample_id, "runner_hostname": self.runner_hostname, "success": True, }, @@ -73,14 +83,16 @@ def _upload_result(self, sample_id: int, result_file: str): if response.status_code != 200: self.logger.error(f"Failed to upload result: {response.content}") - def _run_job(self, sample_id: int): - self.logger.info(f"Starting job for sample id {sample_id}...") + def _run_job(self): + self.logger.info( + f"Starting job {self.job_id} for sample id {self.sample_id}..." + ) self.logger.debug("Downloading input files...") with tempfile.TemporaryDirectory(delete=False) as tmpdir: for input_file_type in ["h5", "csv"]: response = requests.post( url=f"{self.api_url}/input_{input_file_type}_file", - json={"sample_id": sample_id}, + json={"sample_id": self.sample_id}, headers=self.auth_header, timeout=30, ) @@ -89,7 +101,6 @@ def _run_job(self, sample_id: int): f"Failed to download {input_file_type}: {response.content}" ) return self._report_job_failed( - sample_id, f"Failed to download {input_file_type} on {self.runner_hostname}", ) input_file_name = f"input.{input_file_type}" @@ -104,20 +115,20 @@ def _run_job(self, sample_id: int): self.logger.debug(f" - running {tmpdir}/scripts.sh...") subprocess.run(["sh", "./script.sh"], cwd=tmpdir, check=True) self.logger.debug(f" ...{tmpdir}/script.sh finished.") - self._upload_result(sample_id, f"{tmpdir}/result.zip") + self._upload_result(f"{tmpdir}/result.zip") except Exception as e: self.logger.exception(e) - self.logger.error(f"Failed to run job for sample {sample_id}: {e}") + self.logger.error( + f"Failed to run job {self.job_id} for sample {self.sample_id}: {e}" + ) return self._report_job_failed( - sample_id, f"Error during job execution on {self.runner_hostname}: {e}", ) def start(self): self.logger.info(f"Polling {self.api_url} for jobs...") while True: - job_id = self._request_job() - if job_id is not None: - self._run_job(job_id) + if self._request_job(): + self._run_job() else: time.sleep(self.poll_interval) diff --git a/runner/tests/test_runner.py b/runner/tests/test_runner.py index f044eaa..7e7ddb8 100644 --- a/runner/tests/test_runner.py +++ b/runner/tests/test_runner.py @@ -4,6 +4,8 @@ def test_runner_request_job(requests_mock): requests_mock.post("http://api/runner/request_job", status_code=204) runner = Runner(api_url="http://api", jwt_token="abc") - assert runner._request_job() is None - requests_mock.post("http://api/runner/request_job", json={"sample_id": 44}) - assert runner._request_job() == 44 + assert runner._request_job() is False + requests_mock.post( + "http://api/runner/request_job", json={"job_id": 22, "sample_id": 44} + ) + assert runner._request_job() is True