From 3e100a3c0875da686eecf65f943152146dbb9751 Mon Sep 17 00:00:00 2001 From: Mike Cantelon Date: Fri, 1 Sep 2023 12:05:10 -0700 Subject: [PATCH] Make fetch job deletion a Celery task. Move fetch job deletion logic into a Celery task to avoid timeouts. --- AIPscan/Aggregator/tasks.py | 12 +++++++++++- AIPscan/Aggregator/tests/test_tasks.py | 20 +++++++++++++++++++- AIPscan/Aggregator/views.py | 7 ++----- 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/AIPscan/Aggregator/tasks.py b/AIPscan/Aggregator/tasks.py index b52e579c..fec5e4e9 100644 --- a/AIPscan/Aggregator/tasks.py +++ b/AIPscan/Aggregator/tasks.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import json import os +import shutil import requests from celery.utils.log import get_task_logger @@ -20,7 +21,7 @@ ) from AIPscan.extensions import celery from AIPscan.helpers import file_sha256_hash -from AIPscan.models import AIP, get_mets_tasks # Custom celery Models. +from AIPscan.models import AIP, FetchJob, get_mets_tasks # Custom celery Models. logger = get_task_logger(__name__) @@ -359,3 +360,12 @@ def get_mets( os.remove(download_file) except OSError as err: tasklogger.warning("Unable to delete METS file: {}".format(err)) + + +@celery.task() +def delete_fetch_job(fetch_job_id): + fetch_job = FetchJob.query.get(fetch_job_id) + if os.path.exists(fetch_job.download_directory): + shutil.rmtree(fetch_job.download_directory) + db.session.delete(fetch_job) + db.session.commit() diff --git a/AIPscan/Aggregator/tests/test_tasks.py b/AIPscan/Aggregator/tests/test_tasks.py index f18080bc..aebe54dd 100644 --- a/AIPscan/Aggregator/tests/test_tasks.py +++ b/AIPscan/Aggregator/tests/test_tasks.py @@ -9,6 +9,7 @@ from AIPscan import test_helpers from AIPscan.Aggregator.tasks import ( TaskError, + delete_fetch_job, get_mets, make_request, parse_packages_and_load_mets, @@ -21,7 +22,7 @@ VALID_JSON, MockResponse, ) -from AIPscan.models import AIP +from AIPscan.models import AIP, FetchJob SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) FIXTURES_DIR = os.path.join(SCRIPT_DIR, "fixtures") @@ -174,6 +175,23 @@ def mock_download_mets( ) +def test_delete_fetch_job_task(app_instance, tmpdir, mocker): + """Test that fetch job gets deleted by delete fetch job task logic.""" + storage_service = test_helpers.create_test_storage_service() + + # Create fetch job and confirm expected ID + fetch_job1 = test_helpers.create_test_fetch_job( + storage_service_id=storage_service.id + ) + + assert fetch_job1.id == 1 + + # Delete fetch job and confirm it no longer exists + delete_fetch_job(fetch_job1.id) + + assert FetchJob.query.filter_by(id=fetch_job1.id).first() is None + + @pytest.mark.parametrize( "response, raises_task_error", [ diff --git a/AIPscan/Aggregator/views.py b/AIPscan/Aggregator/views.py index 3f50fc4e..55395bcb 100644 --- a/AIPscan/Aggregator/views.py +++ b/AIPscan/Aggregator/views.py @@ -232,13 +232,10 @@ def new_fetch_job(id): @aggregator.route("/delete_fetch_job/", methods=["GET"]) def delete_fetch_job(id): + tasks.delete_fetch_job.delay(id) fetch_job = FetchJob.query.get(id) storage_service = StorageService.query.get(fetch_job.storage_service_id) - if os.path.exists(fetch_job.download_directory): - shutil.rmtree(fetch_job.download_directory) - db.session.delete(fetch_job) - db.session.commit() - flash("Fetch job {} is deleted".format(fetch_job.download_start)) + flash("Fetch job {} is being deleted".format(fetch_job.download_start)) return redirect(url_for("aggregator.storage_service", id=storage_service.id))