Skip to content

Commit

Permalink
one expiration task per provider type
Browse files Browse the repository at this point in the history
  • Loading branch information
maskarb committed Jan 2, 2025
1 parent 0babb2a commit 5fd6f29
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 16 deletions.
2 changes: 1 addition & 1 deletion koku/masu/processor/ocp/ocp_report_db_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def purge_expired_report_data_by_date(self, expired_date, simulate=False):
LOG.info(log_json(msg="deleted table partitions", count=del_count, schema=self._schema))

# Remove all data related to the report period
del_count, _ = OCPUsageReportPeriod.objects.filter(id__in=all_report_periods).delete()
del_count = execute_delete_sql(OCPUsageReportPeriod.objects.filter(id__in=all_report_periods))
LOG.info(
log_json(
msg="deleted ocp-usage-report-periods",
Expand Down
36 changes: 25 additions & 11 deletions koku/masu/processor/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""Report Processing Orchestrator."""
import copy
import logging
from collections import defaultdict
from datetime import datetime
from datetime import timedelta

Expand Down Expand Up @@ -41,7 +42,6 @@
from subs.tasks import extract_subs_data_from_reports
from subs.tasks import SUBS_EXTRACTION_QUEUE


LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -565,17 +565,31 @@ def remove_expired_report_data(self, simulate=False):
"""
async_results = []
schemas = defaultdict(set)
for account in Provider.objects.get_accounts():
LOG.info("Calling remove_expired_data with account: %s", account)
async_result = remove_expired_data.delay(
schema_name=account.get("schema_name"), provider=account.get("provider_type"), simulate=simulate
)
LOG.info(
"Expired data removal queued - schema_name: %s, Task ID: %s",
account.get("schema_name"),
str(async_result),
)
async_results.append({"customer": account.get("customer_name"), "async_id": str(async_result)})
# create a dict of {schema: set(provider_types)}
schemas[account.get("schema_name")].add(account.get("provider_type"))
for schema, provider_types in schemas.items():
for provider_type in provider_types:
LOG.info(
log_json(
"remove_expired_report_data",
msg="calling remove_expired_data",
schema=schema,
provider_type=provider_type,
)
)
async_result = remove_expired_data.delay(schema_name=schema, provider=provider_type, simulate=simulate)
LOG.info(
log_json(
"remove_expired_report_data",
msg="expired data removal queued",
schema=schema,
provider_type=provider_type,
task_id=str(async_result),
)
)
async_results.append({"schema": schema, "provider_type": provider_type, "async_id": str(async_result)})
return async_results

def remove_expired_trino_partitions(self, simulate=False):
Expand Down
16 changes: 12 additions & 4 deletions koku/masu/test/api/test_expired_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class ExpiredDataTest(TestCase):
@patch.object(Orchestrator, "remove_expired_report_data")
def test_get_expired_data(self, mock_orchestrator, _, mock_service):
"""Test the GET expired_data endpoint."""
mock_response = [{"customer": "org1234567", "async_id": "f9eb2ce7-4564-4509-aecc-1200958c07cf"}]
mock_response = [
{"schema": "org1234567", "provider_type": "OCP", "async_id": "f9eb2ce7-4564-4509-aecc-1200958c07cf"}
]
expected_key = "Async jobs for expired data removal (simulated)"
mock_orchestrator.return_value = mock_response
response = self.client.get(reverse("expired_data"))
Expand All @@ -38,7 +40,9 @@ def test_get_expired_data(self, mock_orchestrator, _, mock_service):
@patch.object(Orchestrator, "remove_expired_report_data")
def test_del_expired_data(self, mock_orchestrator, mock_debug, _, mock_service):
"""Test the DELETE expired_data endpoint."""
mock_response = [{"customer": "org1234567", "async_id": "f9eb2ce7-4564-4509-aecc-1200958c07cf"}]
mock_response = [
{"schema": "org1234567", "provider_type": "OCP", "async_id": "f9eb2ce7-4564-4509-aecc-1200958c07cf"}
]
expected_key = "Async jobs for expired data removal"
mock_orchestrator.return_value = mock_response

Expand All @@ -54,7 +58,9 @@ def test_del_expired_data(self, mock_orchestrator, mock_debug, _, mock_service):
@patch.object(Orchestrator, "remove_expired_trino_partitions")
def test_get_expired_partitions(self, mock_orchestrator, _, mock_service):
"""Test the GET expired_trino_paritions endpoint."""
mock_response = [{"customer": "org1234567", "async_id": "f9eb2ce7-4564-4509-aecc-1200958c07cf"}]
mock_response = [
{"schema": "org1234567", "provider_type": "OCP", "async_id": "f9eb2ce7-4564-4509-aecc-1200958c07cf"}
]
expected_key = "Async jobs for expired paritions removal (simulated)"
mock_orchestrator.return_value = mock_response
response = self.client.get(reverse("expired_trino_partitions"))
Expand All @@ -71,7 +77,9 @@ def test_get_expired_partitions(self, mock_orchestrator, _, mock_service):
@patch.object(Orchestrator, "remove_expired_trino_partitions")
def test_del_expired_partitions(self, mock_orchestrator, mock_debug, _, mock_service):
"""Test the DELETE expired_trino_partitions endpoint."""
mock_response = [{"customer": "org1234567", "async_id": "f9eb2ce7-4564-4509-aecc-1200958c07cf"}]
mock_response = [
{"schema": "org1234567", "provider_type": "OCP", "async_id": "f9eb2ce7-4564-4509-aecc-1200958c07cf"}
]
expected_key = "Async jobs for expired paritions removal"
mock_orchestrator.return_value = mock_response

Expand Down

0 comments on commit 5fd6f29

Please sign in to comment.