Skip to content

Commit

Permalink
[COST-4242] process and track metered RHEL by resource id (#4713)
Browse files Browse the repository at this point in the history
* process and track metered RHEL by resource id for latest timestamp
  • Loading branch information
cgoodfred authored Oct 9, 2023
1 parent 9b6b6f5 commit 93141ad
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 127 deletions.
22 changes: 22 additions & 0 deletions koku/reporting/migrations/0309_auto_20231004_1009.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 3.2.21 on 2023-10-04 10:09
from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

dependencies = [
("reporting", "0308_ocpusagelineitemdailysummary_all_labels"),
]

operations = [
migrations.AddField(
model_name="subslastprocessed",
name="resource_id",
field=models.TextField(default=""),
),
migrations.AlterUniqueTogether(
name="subslastprocessed",
unique_together={("source_uuid", "resource_id", "year", "month")},
),
]
3 changes: 2 additions & 1 deletion koku/reporting/provider/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ class Meta:
"""Meta for subs last processed"""

db_table = "reporting_subs_last_processed_time"
unique_together = ("source_uuid", "year", "month")
unique_together = ("source_uuid", "resource_id", "year", "month")

source_uuid = models.ForeignKey(
"reporting.TenantAPIProvider", on_delete=models.CASCADE, unique=False, null=False, db_column="source_uuid"
)
resource_id = models.TextField(null=False, default="")
year = models.CharField(null=False, max_length=4)
month = models.CharField(null=False, max_length=2)
latest_processed_time = models.DateTimeField(null=True)
Expand Down
123 changes: 81 additions & 42 deletions koku/subs/subs_data_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ def subs_s3_path(self):
"""The S3 path to be used for a SUBS report upload."""
return f"{self.schema}/{self.provider_type}/source={self.provider_uuid}/date={self.date_helper.today.date()}"

def determine_latest_processed_time_for_provider(self, year, month):
def determine_latest_processed_time_for_provider(self, rid, year, month):
"""Determine the latest processed timestamp for a provider for a given month and year."""
with schema_context(self.schema):
last_time = SubsLastProcessed.objects.filter(
source_uuid=self.provider_uuid, year=year, month=month
source_uuid=self.provider_uuid, resource_id=rid, year=year, month=month
).first()
if last_time and last_time.latest_processed_time:
# the stored timestamp is the latest timestamp data was gathered for
Expand Down Expand Up @@ -90,18 +90,9 @@ def determine_ids_for_provider(self, year, month):
SubsIDMap.objects.bulk_create(bulk_maps, ignore_conflicts=True)
return id_list

def determine_end_time(self, year, month):
"""Determine the end time for subs processing."""
sql = (
f" SELECT MAX(lineitem_usagestartdate) FROM aws_line_items"
f" WHERE source='{self.provider_uuid}' AND year='{year}' AND month='{month}'"
)
latest = self._execute_trino_raw_sql_query(sql, log_ref="insert_subs_last_processed_time")
return latest[0][0]

def determine_start_time(self, year, month, month_start):
def determine_start_time_for_resource(self, rid, year, month, month_start):
"""Determines the start time for subs processing"""
base_time = self.determine_latest_processed_time_for_provider(year, month) or month_start
base_time = self.determine_latest_processed_time_for_provider(rid, year, month) or month_start
created = Provider.objects.get(uuid=self.provider_uuid).created_timestamp
creation_processing_time = created.replace(microsecond=0, second=0, minute=0, hour=0) - timedelta(days=1)
if base_time < creation_processing_time:
Expand All @@ -117,51 +108,56 @@ def determine_line_item_count(self, where_clause, sql_params):
)
return count[0][0]

def determine_where_clause_and_params(self, latest_processed_time, end_time, year, month, ids):
def determine_where_clause_and_params(self, latest_processed_time, end_time, year, month, rid):
"""Determine the where clause to use when processing subs data"""
where_clause = (
"WHERE source={{provider_uuid}} AND year={{year}} AND month={{month}} AND"
" lineitem_productcode = 'AmazonEC2' AND lineitem_lineitemtype IN ('Usage', 'SavingsPlanCoveredUsage') AND"
" product_vcpu IS NOT NULL AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0 AND"
" lineitem_usagestartdate > {{latest_processed_time}} AND"
" lineitem_usagestartdate <= {{end_time}}"
" lineitem_usagestartdate <= {{end_time}} AND lineitem_resourceid = {{rid}}"
)
if ids:
where_clause += " AND lineitem_usageaccountid IN {{ids | inclause}}"
sql_params = {
"provider_uuid": self.provider_uuid,
"year": year,
"month": month,
"latest_processed_time": latest_processed_time,
"end_time": end_time,
"ids": ids,
"rid": rid,
}
return where_clause, sql_params

def update_latest_processed_time(self, year, month, end_time):
"""Update the latest processing time for a provider"""
def get_resource_ids_for_usage_account(self, usage_account, year, month):
"""Determine the relevant resource ids and end time to process to for each resource id."""
with schema_context(self.schema):
subs_obj, _ = SubsLastProcessed.objects.get_or_create(
source_uuid_id=self.provider_uuid, year=year, month=month
# get a list of IDs to exclude from this source processing
excluded_ids = list(
SubsLastProcessed.objects.exclude(source_uuid=self.provider_uuid).values_list("resource_id", flat=True)
)
subs_obj.latest_processed_time = end_time
subs_obj.save()

def extract_data_to_s3(self, month_start):
"""Process new subs related line items from reports to S3."""
LOG.info(log_json(self.tracing_id, msg="beginning subs rhel extraction", context=self.context))
month = month_start.strftime("%m")
year = month_start.strftime("%Y")
start_time = self.determine_start_time(year, month, month_start)
end_time = self.determine_end_time(year, month)
ids = self.determine_ids_for_provider(year, month)
if not ids:
LOG.info(
log_json(self.tracing_id, msg="no valid IDs to process for current source.", context=self.context)
sql = (
"SELECT lineitem_resourceid, max(lineitem_usagestartdate) FROM aws_line_items WHERE"
" source={{provider_uuid}} AND year={{year}} AND month={{month}}"
" AND lineitem_productcode = 'AmazonEC2' AND strpos(lower(resourcetags), 'com_redhat_rhel') > 0"
" AND lineitem_usageaccountid = {{usage_account}}"
)
self.update_latest_processed_time(year, month, end_time)
return []
where_clause, sql_params = self.determine_where_clause_and_params(start_time, end_time, year, month, ids)
if excluded_ids:
sql += "AND lineitem_resourceid NOT IN {{excluded_ids | inclause}}"
sql += " GROUP BY lineitem_resourceid"
sql_params = {
"provider_uuid": self.provider_uuid,
"year": year,
"month": month,
"excluded_ids": excluded_ids,
"usage_account": usage_account,
}
ids = self._execute_trino_raw_sql_query(
sql, sql_params=sql_params, context=self.context, log_ref="subs_determine_rids_for_provider"
)
return ids

def gather_and_upload_for_resource(self, rid, year, month, start_time, end_time):
"""Gather the data and upload it to S3 for a specific resource id"""
where_clause, sql_params = self.determine_where_clause_and_params(start_time, end_time, year, month, rid)
total_count = self.determine_line_item_count(where_clause, sql_params)
LOG.debug(
log_json(
Expand All @@ -171,7 +167,7 @@ def extract_data_to_s3(self, month_start):
)
)
upload_keys = []
filename = f"subs_{self.tracing_id}_"
filename = f"subs_{self.tracing_id}_{rid}_"
sql_file = f"trino_sql/{self.provider_type.lower()}_subs_summary.sql"
query_sql = pkgutil.get_data("subs", sql_file)
query_sql = query_sql.decode("utf-8")
Expand All @@ -185,7 +181,7 @@ def extract_data_to_s3(self, month_start):
"end_time": end_time,
"offset": offset,
"limit": settings.PARQUET_PROCESSING_BATCH_SIZE,
"ids": ids,
"rid": rid,
}
results, description = self._execute_trino_raw_sql_query_with_description(
query_sql, sql_params=sql_params, log_ref=f"{self.provider_type.lower()}_subs_summary.sql"
Expand All @@ -197,7 +193,50 @@ def extract_data_to_s3(self, month_start):
cols = [col[0] for col in description]

upload_keys.append(self.copy_data_to_subs_s3_bucket(results, cols, f"{filename}{i}.csv"))
self.update_latest_processed_time(year, month, end_time)
return upload_keys

def bulk_update_latest_processed_time(self, resources, year, month):
"""Bulk update the latest processed time for resources."""
with schema_context(self.schema):
bulk_resources = []
for resource, latest_timestamp in resources:
last_processed_obj = SubsLastProcessed.objects.get_or_create(
source_uuid_id=self.provider_uuid, resource_id=resource, year=year, month=month
)[0]
last_processed_obj.latest_processed_time = latest_timestamp
bulk_resources.append(last_processed_obj)
SubsLastProcessed.objects.bulk_update(bulk_resources, fields=["latest_processed_time"])

def extract_data_to_s3(self, month_start):
"""Process new subs related line items from reports to S3."""
LOG.info(log_json(self.tracing_id, msg="beginning subs rhel extraction", context=self.context))
upload_keys = []
month = month_start.strftime("%m")
year = month_start.strftime("%Y")
usage_accounts = self.determine_ids_for_provider(year, month)
LOG.debug(f"found {len(usage_accounts)} usage accounts associated with provider {self.provider_uuid}")
if not usage_accounts:
LOG.info(
log_json(
self.tracing_id, msg="no valid usage accounts to process for current source.", context=self.context
)
)
return []
for usage_account in usage_accounts:
resource_ids = self.get_resource_ids_for_usage_account(usage_account, year, month)
if not resource_ids:
LOG.debug(
log_json(
self.tracing_id,
msg=f"no relevant resource ids found for usage account {usage_account}.",
context=self.context,
)
)
continue
for rid, end_time in resource_ids:
start_time = self.determine_start_time_for_resource(rid, year, month, month_start)
upload_keys.extend(self.gather_and_upload_for_resource(rid, year, month, start_time, end_time))
self.bulk_update_latest_processed_time(resource_ids, year, month)
LOG.info(
log_json(
self.tracing_id,
Expand Down
Loading

0 comments on commit 93141ad

Please sign in to comment.