Skip to content

Commit

Permalink
Add Celery task for file cleanup after fetch job
Browse files Browse the repository at this point in the history
Adds a small Celery task to delete the directory in which files related
to a fetch job are stored.
  • Loading branch information
mcantelon committed Jan 17, 2023
1 parent d7e9566 commit 945aea0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 2 deletions.
17 changes: 15 additions & 2 deletions AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import json
import os
import shutil

import requests
from celery.utils.log import get_task_logger
Expand Down Expand Up @@ -185,8 +186,10 @@ def workflow_coordinator(
)
total_replicas = len([package for package in all_packages if package.is_replica()])

summary = "aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format(
total_aips, total_sips, total_dips, total_deleted_aips, total_replicas
summary = (
"aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format(
total_aips, total_sips, total_dips, total_deleted_aips, total_replicas
)
)
logger.info("%s", summary)

Expand Down Expand Up @@ -272,6 +275,16 @@ def package_lists_request(self, apiUrl, timestamp, packages_directory):
}


@celery.task()
def fetch_job_file_cleanup(fetch_job_id):
obj = FetchJob.query.filter_by(id=fetch_job_id).first()

if os.path.isdir(obj.download_directory):
shutil.rmtree(obj.download_directory)

logger.info("Cleaned up after fetch job {}".format(fetch_job_id))


@celery.task()
def get_mets(
package_uuid,
Expand Down
17 changes: 17 additions & 0 deletions AIPscan/Aggregator/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from AIPscan import test_helpers
from AIPscan.Aggregator.tasks import (
TaskError,
fetch_job_file_cleanup,
get_mets,
make_request,
parse_packages_and_load_mets,
Expand Down Expand Up @@ -157,6 +158,22 @@ def mock_download_mets(
delete_mets_file.call_count == 3


def test_fetch_job_file_cleanup_task(app_instance, tmpdir, mocker):
test_downloads_dir = os.path.join(tmpdir, "downloads")
os.mkdir(test_downloads_dir)

# Mock download directory should be created
assert os.path.isdir(test_downloads_dir)

fetch_job1 = test_helpers.create_test_fetch_job(
download_directory=test_downloads_dir
)
fetch_job_file_cleanup(fetch_job1.id)

# Mock download directory should be removed
assert not os.path.isdir(test_downloads_dir)


@pytest.mark.parametrize(
"response, raises_task_error",
[
Expand Down
3 changes: 3 additions & 0 deletions AIPscan/Aggregator/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ def get_mets_task_status(coordinatorid):
downloadStart = _format_date(start)
obj.download_end = downloadEnd
db.session.commit()

tasks.fetch_job_file_cleanup.delay(fetchJobId)

response = {"state": "COMPLETED"}
flash("Fetch Job {} completed".format(downloadStart))
return jsonify(response)

0 comments on commit 945aea0

Please sign in to comment.