Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete bulk submissions asynchronously #2738

Merged
merged 19 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions onadata/apps/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,33 @@
"""
Celery api.tasks module.
"""

import logging
import os
import sys
import logging
from datetime import timedelta

from celery.result import AsyncResult
from django.conf import settings
from django.core.files.uploadedfile import TemporaryUploadedFile
from django.core.files.storage import default_storage
from django.contrib.auth import get_user_model
from django.core.files.storage import default_storage
from django.core.files.uploadedfile import TemporaryUploadedFile
from django.db import DatabaseError
from django.utils import timezone
from django.utils.datastructures import MultiValueDict

from celery.result import AsyncResult

from onadata.apps.api import tools
from onadata.apps.logger.models import Instance, ProjectInvitation, XForm, Project
from onadata.apps.logger.models import Instance, Project, ProjectInvitation, XForm
from onadata.celeryapp import app
from onadata.libs.utils.email import send_generic_email
from onadata.libs.utils.model_tools import queryset_iterator
from onadata.libs.models.share_project import ShareProject
from onadata.libs.utils.cache_tools import (
safe_delete,
XFORM_REGENERATE_INSTANCE_JSON_TASK,
safe_delete,
)
from onadata.libs.models.share_project import ShareProject
from onadata.libs.utils.email import ProjectInvitationEmail
from onadata.libs.utils.email import ProjectInvitationEmail, send_generic_email
from onadata.libs.utils.logger_tools import delete_xform_submissions
from onadata.libs.utils.model_tools import queryset_iterator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -200,3 +202,28 @@ def share_project_async(project_id, username, role, remove=False):
else:
share = ShareProject(project, username, role, remove)
share.save()


@app.task(retry_backoff=3, autoretry_for=(DatabaseError, ConnectionError))
def delete_xform_submissions_async(
xform_id: int,
deleted_by_id: int,
instance_ids: list[int] | None = None,
soft_delete: bool = True,
):
"""Delete xform submissions asynchronously

:param xform_id: XForm id
:param deleted_by_id: User id who deleted the instances
:param instance_ids: List of instance ids to delete, None to delete all
:param soft_delete: Soft delete instances if True, otherwise hard delete
"""
try:
xform = XForm.objects.get(pk=xform_id)
deleted_by = User.objects.get(pk=deleted_by_id)

except (XForm.DoesNotExist, User.DoesNotExist) as err:
logger.exception(err)

else:
delete_xform_submissions(xform, deleted_by, instance_ids, soft_delete)
57 changes: 50 additions & 7 deletions onadata/apps/api/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
"""Tests for module onadata.apps.api.tasks"""

import sys

from unittest.mock import patch

from django.core.cache import cache
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.db import DatabaseError, OperationalError

from onadata.apps.api.tasks import (
send_project_invitation_email_async,
ShareProject,
delete_xform_submissions_async,
regenerate_form_instance_json,
send_project_invitation_email_async,
share_project_async,
ShareProject,
)
from onadata.apps.logger.models import ProjectInvitation, Instance
from onadata.apps.logger.models import Instance, ProjectInvitation
from onadata.apps.main.tests.test_base import TestBase
from onadata.libs.permissions import ManagerRole
from onadata.libs.serializers.organization_serializer import OrganizationSerializer
from onadata.libs.utils.user_auth import get_user_default_project
from onadata.libs.utils.email import ProjectInvitationEmail
from onadata.libs.utils.cache_tools import ORG_PROFILE_CACHE
from onadata.libs.utils.email import ProjectInvitationEmail
from onadata.libs.utils.user_auth import get_user_default_project

User = get_user_model()

Expand Down Expand Up @@ -185,3 +185,46 @@ def test_operation_error(self, mock_retry, mock_share):
self.assertTrue(mock_retry.called)
_, kwargs = mock_retry.call_args_list[0]
self.assertTrue(isinstance(kwargs["exc"], OperationalError))


@patch("onadata.apps.api.tasks.delete_xform_submissions")
class DeleteXFormSubmissionsAsyncTestCase(TestBase):
"""Tests for delete_xform_submissions_async"""

def setUp(self):
super().setUp()

self._publish_transportation_form()

def test_delete(self, mock_delete):
"""Submissions are deleted"""
delete_xform_submissions_async.delay(self.xform.pk, self.user.pk, [1, 2], False)
mock_delete.assert_called_once_with(self.xform, self.user, [1, 2], False)

@patch("onadata.apps.api.tasks.delete_xform_submissions_async.retry")
def test_database_error(self, mock_retry, mock_delete):
"""We retry calls if DatabaseError is raised"""
mock_delete.side_effect = DatabaseError()
delete_xform_submissions_async.delay(self.xform.pk, self.user.pk)
self.assertTrue(mock_retry.called)

@patch("onadata.apps.api.tasks.delete_xform_submissions_async.retry")
def test_connection_error(self, mock_retry, mock_delete):
"""We retry calls if ConnectionError is raised"""
mock_delete.side_effect = ConnectionError()
delete_xform_submissions_async.delay(self.xform.pk, self.user.pk)
self.assertTrue(mock_retry.called)

@patch("onadata.apps.api.tasks.logger.exception")
def test_xform_id_invalid(self, mock_logger, mock_delete):
"""Invalid xform_id is handled"""
delete_xform_submissions_async.delay(sys.maxsize, self.user.pk)
self.assertFalse(mock_delete.called)
mock_logger.assert_called_once()

@patch("onadata.apps.api.tasks.logger.exception")
def test_user_id_invalid(self, mock_logger, mock_delete):
"""Invalid user_id is handled"""
delete_xform_submissions_async.delay(self.xform.pk, sys.maxsize)
self.assertFalse(mock_delete.called)
mock_logger.assert_called_once()
127 changes: 86 additions & 41 deletions onadata/apps/api/tests/viewsets/test_data_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
Test /data API endpoint implementation.
"""

from __future__ import unicode_literals

import csv
Expand Down Expand Up @@ -1671,8 +1672,7 @@ def test_data_w_attachment(self):
self.assertIsInstance(response.data, dict)
self.assertDictContainsSubset(data, response.data)

@patch("onadata.apps.api.viewsets.data_viewset.send_message")
def test_delete_submission(self, send_message_mock):
def test_delete_submission(self):
self._make_submissions()
formid = self.xform.pk
dataid = self.xform.instances.all().order_by("id")[0].pk
Expand All @@ -1691,15 +1691,6 @@ def test_delete_submission(self, send_message_mock):
self.assertEqual(response.status_code, 204)
first_xform_instance = self.xform.instances.filter(pk=dataid)
self.assertEqual(first_xform_instance[0].deleted_by, request.user)
# message sent upon delete
self.assertTrue(send_message_mock.called)
send_message_mock.assert_called_with(
instance_id=dataid,
target_id=formid,
target_type=XFORM,
user=request.user,
message_verb=SUBMISSION_DELETED,
)

# second delete of same submission should return 404
request = self.factory.delete("/", **self.extra)
Expand Down Expand Up @@ -1767,8 +1758,8 @@ def test_post_save_signal_on_submission_deletion(self, mock, send_message_mock):
self.assertEqual(mock.call_count, 1)
self.assertTrue(send_message_mock.called)

@patch("onadata.apps.api.viewsets.data_viewset.send_message")
def test_deletion_of_bulk_submissions(self, send_message_mock):
@patch("onadata.apps.api.viewsets.data_viewset.safe_cache_set")
def test_deletion_of_bulk_submissions(self, mock_cache_set):
self._make_submissions()
self.xform.refresh_from_db()
formid = self.xform.pk
Expand Down Expand Up @@ -1804,19 +1795,16 @@ def test_deletion_of_bulk_submissions(self, send_message_mock):
response.data.get("message"),
"%d records were deleted" % len(records_to_be_deleted),
)
self.assertTrue(send_message_mock.called)
send_message_mock.assert_called_with(
instance_id=[str(i.pk) for i in records_to_be_deleted],
target_id=formid,
target_type=XFORM,
user=request.user,
message_verb=SUBMISSION_DELETED,
)
self.xform.refresh_from_db()
current_count = self.xform.instances.filter(deleted_at=None).count()
self.assertNotEqual(current_count, initial_count)
self.assertEqual(current_count, 2)
self.assertEqual(self.xform.num_of_submissions, 2)
mock_cache_set.assert_called_once_with(
f"xfm-submissions-deleting-{formid}",
[str(i.pk) for i in records_to_be_deleted],
3600,
)

@override_settings(ENABLE_SUBMISSION_PERMANENT_DELETE=True)
@patch("onadata.apps.api.viewsets.data_viewset.send_message")
Expand Down Expand Up @@ -1879,8 +1867,7 @@ def test_submissions_permanent_deletion(self, send_message_mock):
self.assertEqual(self.xform.num_of_submissions, 3)

@override_settings(ENABLE_SUBMISSION_PERMANENT_DELETE=True)
@patch("onadata.apps.api.viewsets.data_viewset.send_message")
def test_permanent_deletions_bulk_submissions(self, send_message_mock):
def test_permanent_deletions_bulk_submissions(self):
"""
Test that permanent bulk submission deletions work
"""
Expand All @@ -1904,14 +1891,6 @@ def test_permanent_deletions_bulk_submissions(self, send_message_mock):
response.data.get("message"),
"%d records were deleted" % len(records_to_be_deleted),
)
self.assertTrue(send_message_mock.called)
send_message_mock.assert_called_with(
instance_id=[str(i.pk) for i in records_to_be_deleted],
target_id=formid,
target_type=XFORM,
user=request.user,
message_verb=SUBMISSION_DELETED,
)
self.xform.refresh_from_db()
current_count = self.xform.num_of_submissions
self.assertNotEqual(current_count, initial_count)
Expand Down Expand Up @@ -2016,8 +1995,7 @@ def test_delete_submission_inactive_form(self, send_message_mock):
self.assertEqual(response.status_code, 400)
self.assertTrue(send_message_mock.called)

@patch("onadata.apps.api.viewsets.data_viewset.send_message")
def test_delete_submissions(self, send_message_mock):
def test_delete_submissions(self):
xls_file_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"../fixtures/tutorial/tutorial.xlsx",
Expand Down Expand Up @@ -2057,14 +2035,6 @@ def test_delete_submissions(self, send_message_mock):
response.data.get("message"),
"%d records were deleted" % len(deleted_instances_subset),
)
self.assertTrue(send_message_mock.called)
send_message_mock.assert_called_with(
instance_id=[str(i.pk) for i in deleted_instances_subset],
target_id=formid,
target_type=XFORM,
user=request.user,
message_verb=SUBMISSION_DELETED,
)

# Test that num of submissions for the form is successfully updated
self.xform.refresh_from_db()
Expand Down Expand Up @@ -3825,6 +3795,81 @@ def test_merged_dataset_geojson(self):
response.data,
)

def test_submissions_deletion_in_progress(self):
"""Submissions whose deletion is in progress are excluded from list"""
self._make_submissions()
self.assertEqual(self.xform.instances.count(), 4)
view = DataViewSet.as_view({"get": "list"})
formid = self.xform.pk
instances = self.xform.instances.all()
cache.set(
f"xfm-submissions-deleting-{self.xform.pk}",
[instances[0].pk, instances[1].pk],
)
# No query
request = self.factory.get("/", **self.extra)
response = view(request, pk=formid)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# With query
data = {"query": '{"_submission_time":{"$gt":"2018-04-19"}}'}
request = self.factory.get("/", **self.extra, data=data)
response = view(request, pk=formid)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# With sort
data = {"sort": 1}
request = self.factory.get("/", **self.extra, data=data)
response = view(request, pk=formid)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# Cached submission ids saved as strings
cache.set(
f"xfm-submissions-deleting-{self.xform.pk}",
[str(instances[0].pk), str(instances[1].pk)],
)
request = self.factory.get("/", **self.extra)
response = view(request, pk=formid)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)

@override_settings(ENABLE_SUBMISSION_PERMANENT_DELETE=True)
@patch(
"onadata.apps.api.viewsets.data_viewset.delete_xform_submissions_async.delay"
)
def test_deletion_of_bulk_submissions_async(self, mock_del_async):
"""Deletion of bulk submissions is done asynchronously"""
self._make_submissions()

view = DataViewSet.as_view({"delete": "destroy"})

records_to_be_deleted = self.xform.instances.all()[:2]
instance_ids = ",".join([str(i.pk) for i in records_to_be_deleted])
data = {"instance_ids": instance_ids}
request = self.factory.delete("/", data=data, **self.extra)
response = view(request, pk=self.xform.pk)

self.assertEqual(response.status_code, 200)
mock_del_async.assert_called_once_with(
self.xform.pk,
self.user.pk,
[str(records_to_be_deleted[0].pk), str(records_to_be_deleted[1].pk)],
True,
)
# Permanent deletion
mock_del_async.reset_mock() # Reset mock
data = {"permanent_delete": True, "instance_ids": instance_ids}
request = self.factory.delete("/", data=data, **self.extra)
response = view(request, pk=self.xform.pk)

self.assertEqual(response.status_code, 200)
mock_del_async.assert_called_once_with(
self.xform.pk,
self.user.pk,
[str(records_to_be_deleted[0].pk), str(records_to_be_deleted[1].pk)],
False,
)


class TestOSM(TestAbstractViewSet):
"""
Expand Down
Loading