Skip to content

Commit

Permalink
Make fetch job deletion a Celery task.
Browse files Browse the repository at this point in the history
Move fetch job deletion logic into a Celery task to avoid timeouts.
  • Loading branch information
mcantelon committed Sep 3, 2023
1 parent c45b917 commit 3e100a3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
12 changes: 11 additions & 1 deletion AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import json
import os
import shutil

import requests
from celery.utils.log import get_task_logger
Expand All @@ -20,7 +21,7 @@
)
from AIPscan.extensions import celery
from AIPscan.helpers import file_sha256_hash
from AIPscan.models import AIP, get_mets_tasks # Custom celery Models.
from AIPscan.models import AIP, FetchJob, get_mets_tasks # Custom celery Models.

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -359,3 +360,12 @@ def get_mets(
os.remove(download_file)
except OSError as err:
tasklogger.warning("Unable to delete METS file: {}".format(err))


@celery.task()
def delete_fetch_job(fetch_job_id):
fetch_job = FetchJob.query.get(fetch_job_id)
if os.path.exists(fetch_job.download_directory):
shutil.rmtree(fetch_job.download_directory)

Check warning on line 369 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L369

Added line #L369 was not covered by tests
db.session.delete(fetch_job)
db.session.commit()
20 changes: 19 additions & 1 deletion AIPscan/Aggregator/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from AIPscan import test_helpers
from AIPscan.Aggregator.tasks import (
TaskError,
delete_fetch_job,
get_mets,
make_request,
parse_packages_and_load_mets,
Expand All @@ -21,7 +22,7 @@
VALID_JSON,
MockResponse,
)
from AIPscan.models import AIP
from AIPscan.models import AIP, FetchJob

SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
FIXTURES_DIR = os.path.join(SCRIPT_DIR, "fixtures")
Expand Down Expand Up @@ -174,6 +175,23 @@ def mock_download_mets(
)


def test_delete_fetch_job_task(app_instance, tmpdir, mocker):
"""Test that fetch job gets deleted by delete fetch job task logic."""
storage_service = test_helpers.create_test_storage_service()

# Create fetch job and confirm expected ID
fetch_job1 = test_helpers.create_test_fetch_job(
storage_service_id=storage_service.id
)

assert fetch_job1.id == 1

# Delete fetch job and confirm it no longer exists
delete_fetch_job(fetch_job1.id)

assert FetchJob.query.filter_by(id=fetch_job1.id).first() is None


@pytest.mark.parametrize(
"response, raises_task_error",
[
Expand Down
7 changes: 2 additions & 5 deletions AIPscan/Aggregator/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,10 @@ def new_fetch_job(id):

@aggregator.route("/delete_fetch_job/<id>", methods=["GET"])
def delete_fetch_job(id):
tasks.delete_fetch_job.delay(id)

Check warning on line 235 in AIPscan/Aggregator/views.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/views.py#L235

Added line #L235 was not covered by tests
fetch_job = FetchJob.query.get(id)
storage_service = StorageService.query.get(fetch_job.storage_service_id)
if os.path.exists(fetch_job.download_directory):
shutil.rmtree(fetch_job.download_directory)
db.session.delete(fetch_job)
db.session.commit()
flash("Fetch job {} is deleted".format(fetch_job.download_start))
flash("Fetch job {} is being deleted".format(fetch_job.download_start))

Check warning on line 238 in AIPscan/Aggregator/views.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/views.py#L238

Added line #L238 was not covered by tests
return redirect(url_for("aggregator.storage_service", id=storage_service.id))


Expand Down

0 comments on commit 3e100a3

Please sign in to comment.