Skip to content

Commit

Permalink
[COST-5132] OCP AWS Parquet Processing (#5263)
Browse files Browse the repository at this point in the history
* [COST-5129] OCP AWS Parquet Processing via Trino

* default to summary queue

* handle table cleanup on ocp source delete

* handle csi_volume_labels and change the matching to endswith to match the old way
  • Loading branch information
cgoodfred authored Aug 29, 2024
1 parent 66e2f9e commit 2160741
Show file tree
Hide file tree
Showing 13 changed files with 586 additions and 63 deletions.
4 changes: 4 additions & 0 deletions koku/api/provider/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class Meta:
PROVIDER_GCP,
PROVIDER_GCP_LOCAL,
]
MANAGED_OPENSHIFT_ON_CLOUD_PROVIDER_LIST = [
PROVIDER_AWS,
PROVIDER_AWS_LOCAL,
]

uuid = models.UUIDField(default=uuid4, primary_key=True)
name = models.CharField(max_length=256, null=False)
Expand Down
57 changes: 53 additions & 4 deletions koku/masu/database/aws_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import uuid

from dateutil.parser import parse
from django.conf import settings
from django.db import connection
from django.db.models import F
from django.db.models import Q
Expand All @@ -32,6 +33,7 @@
from reporting.provider.all.models import TagMapping
from reporting.provider.aws.models import AWSCostEntryBill
from reporting.provider.aws.models import AWSCostEntryLineItemDailySummary
from reporting.provider.aws.models import TRINO_MANAGED_OCP_AWS_DAILY_TABLE
from reporting.provider.aws.models import UI_SUMMARY_TABLES
from reporting.provider.aws.openshift.models import UI_SUMMARY_TABLES as OCPAWS_UI_SUMMARY_TABLES

Expand Down Expand Up @@ -176,9 +178,10 @@ def populate_ocp_on_aws_ui_summary_tables_trino(
}
self._execute_trino_raw_sql_query(sql, sql_params=sql_params, log_ref=f"{table_name}.sql")

def delete_ocp_on_aws_hive_partition_by_day(self, days, aws_source, ocp_source, year, month):
def delete_ocp_on_aws_hive_partition_by_day(
self, days, aws_source, ocp_source, year, month, table="reporting_ocpawscostlineitem_project_daily_summary"
):
"""Deletes partitions individually for each day in days list."""
table = "reporting_ocpawscostlineitem_project_daily_summary"
if self.schema_exists_trino() and self.table_exists_trino(table):
LOG.info(
log_json(
Expand All @@ -193,16 +196,20 @@ def delete_ocp_on_aws_hive_partition_by_day(self, days, aws_source, ocp_source,
)
)
for day in days:
if table == TRINO_MANAGED_OCP_AWS_DAILY_TABLE:
column_name = "source"
else:
column_name = "aws_source"
sql = f"""
DELETE FROM hive.{self.schema}.{table}
WHERE aws_source = '{aws_source}'
WHERE {column_name} = '{aws_source}'
AND ocp_source = '{ocp_source}'
AND year = '{year}'
AND (month = replace(ltrim(replace('{month}', '0', ' ')),' ', '0') OR month = '{month}')
AND day = '{day}'"""
self._execute_trino_raw_sql_query(
sql,
log_ref=f"delete_ocp_on_aws_hive_partition_by_day for {year}-{month}-{day}",
log_ref=f"delete_ocp_on_aws_hive_partition_by_day for {year}-{month}-{day} from {table}",
)

def populate_ocp_on_aws_cost_daily_summary_trino(
Expand Down Expand Up @@ -468,3 +475,45 @@ def populate_ec2_compute_summary_table_trino(self, source_uuid, start_date, bill
}

self._execute_trino_raw_sql_query(sql, sql_params=sql_params, log_ref=f"{table_name}.sql")

def populate_ocp_on_cloud_daily_trino(
self, aws_provider_uuid, openshift_provider_uuid, start_date, end_date, matched_tags
):
"""Populate the aws_openshift_daily trino table for OCP on AWS.
Args:
aws_provider_uuid (UUID) GCP source UUID.
ocp_provider_uuid (UUID) OCP source UUID.
start_date (datetime.date) The date to start populating the table.
end_date (datetime.date) The date to end on.
matched_tag_strs (str) matching tags.
Returns
(None)
"""
if type(start_date) == str:
start_date = parse(start_date).astimezone(tz=settings.UTC)
if type(end_date) == str:
end_date = parse(end_date).astimezone(tz=settings.UTC)
year = start_date.strftime("%Y")
month = start_date.strftime("%m")
table = TRINO_MANAGED_OCP_AWS_DAILY_TABLE
days = self.date_helper.list_days(start_date, end_date)
days_tup = tuple(str(day.day) for day in days)
self.delete_ocp_on_aws_hive_partition_by_day(
days_tup, aws_provider_uuid, openshift_provider_uuid, year, month, table
)

summary_sql = pkgutil.get_data("masu.database", "trino_sql/aws/openshift/managed_aws_openshift_daily.sql")
summary_sql = summary_sql.decode("utf-8")
summary_sql_params = {
"schema": self.schema,
"start_date": start_date,
"year": year,
"month": month,
"days": days_tup,
"end_date": end_date,
"aws_source_uuid": aws_provider_uuid,
"ocp_source_uuid": openshift_provider_uuid,
"matched_tag_array": matched_tags,
}
LOG.info(log_json(msg="running managed OCP on AWS daily SQL", **summary_sql_params))
self._execute_trino_multipart_sql_query(summary_sql, bind_params=summary_sql_params)
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
-- Now create our proper table if it does not exist
CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_aws_openshift_daily
(
lineitem_resourceid varchar,
lineitem_usagestartdate timestamp(3),
bill_payeraccountid varchar,
lineitem_usageaccountid varchar,
lineitem_legalentity varchar,
lineitem_lineitemdescription varchar,
bill_billingentity varchar,
lineitem_productcode varchar,
lineitem_availabilityzone varchar,
lineitem_lineitemtype varchar,
lineitem_usagetype varchar,
lineitem_operation varchar,
product_productfamily varchar,
product_instancetype varchar,
product_region varchar,
pricing_unit varchar,
resourcetags varchar,
costcategory varchar,
lineitem_usageamount double,
lineitem_normalizationfactor double,
lineitem_normalizedusageamount double,
lineitem_currencycode varchar,
lineitem_unblendedrate double,
lineitem_unblendedcost double,
lineitem_blendedrate double,
lineitem_blendedcost double,
pricing_publicondemandcost double,
pricing_publicondemandrate double,
savingsplan_savingsplaneffectivecost double,
bill_invoiceid varchar,
product_productname varchar,
product_vcpu varchar,
product_memory varchar,
resource_id_matched boolean,
matched_tag varchar,
source varchar,
ocp_source varchar,
year varchar,
month varchar,
day varchar
) WITH(format = 'PARQUET', partitioned_by=ARRAY['source', 'ocp_source', 'year', 'month', 'day'])
;

-- Direct resource matching
INSERT INTO hive.{{schema | sqlsafe}}.managed_aws_openshift_daily (
lineitem_resourceid,
lineitem_usagestartdate,
bill_payeraccountid,
lineitem_usageaccountid,
lineitem_legalentity,
lineitem_lineitemdescription,
bill_billingentity,
lineitem_productcode,
lineitem_availabilityzone,
lineitem_lineitemtype,
product_productfamily,
product_instancetype,
product_region,
pricing_unit,
resourcetags,
costcategory,
lineitem_usageamount,
lineitem_normalizationfactor,
lineitem_normalizedusageamount,
lineitem_currencycode,
lineitem_unblendedrate,
lineitem_unblendedcost,
lineitem_blendedrate,
lineitem_blendedcost,
pricing_publicondemandcost,
pricing_publicondemandrate,
savingsplan_savingsplaneffectivecost,
product_productname,
bill_invoiceid,
resource_id_matched,
matched_tag,
source,
ocp_source,
year,
month,
day
)
WITH cte_aws_resource_names AS (
SELECT DISTINCT lineitem_resourceid
FROM hive.{{schema | sqlsafe}}.aws_line_items_daily
WHERE source = {{aws_source_uuid}}
AND year = {{year}}
AND month = {{month}}
AND lineitem_usagestartdate >= {{start_date}}
AND lineitem_usagestartdate < date_add('day', 1, {{end_date}})
),
cte_array_agg_nodes AS (
SELECT DISTINCT resource_id
FROM hive.{{schema | sqlsafe}}.openshift_pod_usage_line_items_daily
WHERE source = {{ocp_source_uuid}}
AND year = {{year}}
AND month = {{month}}
AND interval_start >= {{start_date}}
AND interval_start < date_add('day', 1, {{end_date}})
),
cte_array_agg_volumes AS (
SELECT DISTINCT persistentvolume, csi_volume_handle
FROM hive.{{schema | sqlsafe}}.openshift_storage_usage_line_items_daily
WHERE source = {{ocp_source_uuid}}
AND year = {{year}}
AND month = {{month}}
AND interval_start >= {{start_date}}
AND interval_start < date_add('day', 1, {{end_date}})
),
cte_matchable_resource_names AS (
SELECT resource_names.lineitem_resourceid
FROM cte_aws_resource_names AS resource_names
JOIN cte_array_agg_nodes AS nodes
ON strpos(resource_names.lineitem_resourceid, nodes.resource_id) != 0

UNION

SELECT resource_names.lineitem_resourceid
FROM cte_aws_resource_names AS resource_names
JOIN cte_array_agg_volumes AS volumes
ON (
strpos(resource_names.lineitem_resourceid, volumes.persistentvolume) != 0
OR strpos(resource_names.lineitem_resourceid, volumes.csi_volume_handle) != 0
)

),
cte_tag_matches AS (
SELECT * FROM unnest(ARRAY{{matched_tag_array | sqlsafe}}) as t(matched_tag)

UNION

SELECT * FROM unnest(ARRAY['openshift_cluster', 'openshift_node', 'openshift_project']) as t(matched_tag)
),
cte_agg_tags AS (
SELECT array_agg(matched_tag) as matched_tags from cte_tag_matches
)
SELECT aws.lineitem_resourceid,
aws.lineitem_usagestartdate,
aws.bill_payeraccountid,
aws.lineitem_usageaccountid,
aws.lineitem_legalentity,
aws.lineitem_lineitemdescription,
aws.bill_billingentity,
aws.lineitem_productcode,
aws.lineitem_availabilityzone,
aws.lineitem_lineitemtype,
aws.product_productfamily,
aws.product_instancetype,
aws.product_region,
aws.pricing_unit,
aws.resourcetags,
aws.costcategory,
aws.lineitem_usageamount,
aws.lineitem_normalizationfactor,
aws.lineitem_normalizedusageamount,
aws.lineitem_currencycode,
aws.lineitem_unblendedrate,
aws.lineitem_unblendedcost,
aws.lineitem_blendedrate,
aws.lineitem_blendedcost,
aws.pricing_publicondemandcost,
aws.pricing_publicondemandrate,
aws.savingsplan_savingsplaneffectivecost,
aws.product_productname,
aws.bill_invoiceid,
CASE WHEN resource_names.lineitem_resourceid IS NOT NULL
THEN TRUE
ELSE FALSE
END as resource_id_matched,
array_join(filter(tag_matches.matched_tags, x -> STRPOS(resourcetags, x ) != 0), ',') as matched_tag,
aws.source as source,
{{ocp_source_uuid}} as ocp_source,
aws.year,
aws.month,
cast(day(aws.lineitem_usagestartdate) as varchar) as day
FROM hive.{{schema | sqlsafe}}.aws_line_items_daily AS aws
LEFT JOIN cte_matchable_resource_names AS resource_names
-- this matches the endswith method this matching was done with in python
ON substr(aws.lineitem_resourceid, -length(resource_names.lineitem_resourceid)) = resource_names.lineitem_resourceid
LEFT JOIN cte_agg_tags AS tag_matches
ON any_match(tag_matches.matched_tags, x->strpos(resourcetags, x) != 0)
WHERE aws.source = {{aws_source_uuid}}
AND aws.year = {{year}}
AND aws.month= {{month}}
AND aws.lineitem_usagestartdate >= {{start_date}}
AND aws.lineitem_usagestartdate < date_add('day', 1, {{end_date}})
AND (resource_names.lineitem_resourceid IS NOT NULL OR tag_matches.matched_tags IS NOT NULL)
2 changes: 2 additions & 0 deletions koku/masu/management/commands/migrate_trino_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"reporting_ocpusagelineitem_daily_summary",
"azure_openshift_disk_capacities_temp",
"aws_openshift_disk_capacities_temp",
"managed_aws_openshift_daily",
}

manage_table_mapping = {
Expand All @@ -86,6 +87,7 @@
"reporting_ocpusagelineitem_daily_summary": "source",
"azure_openshift_disk_capacities_temp": "ocp_source",
"aws_openshift_disk_capacities_temp": "ocp_source",
"managed_aws_openshift_daily": "ocp_source",
}

VALID_CHARACTERS = re.compile(r"^[\w.-]+$")
Expand Down
12 changes: 12 additions & 0 deletions koku/masu/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ def is_validation_enabled(account): # pragma: no cover
return UNLEASH_CLIENT.is_enabled("cost-management.backend.enable_data_validation", context)


def is_managed_ocp_cloud_processing_enabled(account):
account = convert_account(account)
context = {"schema": account}
return UNLEASH_CLIENT.is_enabled("cost-management.backend.feature-cost-5129-ocp-cloud-processing", context)


def is_managed_ocp_cloud_summary_enabled(account):
account = convert_account(account)
context = {"schema": account}
return UNLEASH_CLIENT.is_enabled("cost-management.backend.feature-cost-5129-ocp-cloud-summary", context)


def is_ocp_amortized_monthly_cost_enabled(account): # pragma: no cover
"""Enable the use of savings plan cost for OCP on AWS -> OCP."""
account = convert_account(account)
Expand Down
16 changes: 15 additions & 1 deletion koku/masu/processor/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from masu.processor import is_customer_large
from masu.processor import is_source_disabled
from masu.processor.tasks import get_report_files
from masu.processor.tasks import process_openshift_on_cloud_trino
from masu.processor.tasks import record_all_manifest_files
from masu.processor.tasks import record_report_status
from masu.processor.tasks import remove_expired_data
Expand Down Expand Up @@ -373,7 +374,20 @@ def start_manifest_processing( # noqa: C901
queue=SUBS_EXTRACTION_QUEUE
)
LOG.info(log_json("start_manifest_processing", msg="created subs_task signature", schema=schema_name))
async_id = chord(report_tasks, group(summary_task, hcs_task, subs_task))()
ocp_on_cloud_trino_task = process_openshift_on_cloud_trino.s(
provider_type=provider_type,
schema_name=schema_name,
provider_uuid=provider_uuid,
tracing_id=tracing_id,
).set(queue=SUMMARY_QUEUE)
LOG.info(
log_json(
"start_manifest_processing",
msg="created ocp_on_cloud_trino_task signature",
schema=schema_name,
)
)
async_id = chord(report_tasks, group(summary_task, hcs_task, subs_task, ocp_on_cloud_trino_task))()
LOG.info(
log_json(
"start_manifest_processing",
Expand Down
15 changes: 15 additions & 0 deletions koku/masu/processor/parquet/ocp_cloud_parquet_report_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#
#
"""Processor to filter cost data for OpenShift and store as parquet."""
import json
import logging
from functools import cached_property

Expand Down Expand Up @@ -276,3 +277,17 @@ def process(self, parquet_base_filename, daily_data_frames):
self.create_partitioned_ocp_on_cloud_parquet(openshift_filtered_data_frame, parquet_base_filename)
else:
self.create_ocp_on_cloud_parquet(openshift_filtered_data_frame, parquet_base_filename)

def process_ocp_cloud_trino(self, start_date, end_date):
"""Populate cloud_openshift_daily trino table via SQL."""
if not (ocp_provider_uuids := self.get_ocp_provider_uuids_tuple()):
return
matched_tags = self.get_matched_tags(ocp_provider_uuids)
matched_tag_strs = []
if matched_tags:
matched_tag_strs = [json.dumps(match).replace("{", "").replace("}", "") for match in matched_tags]

for ocp_provider_uuid in ocp_provider_uuids:
self.db_accessor.populate_ocp_on_cloud_daily_trino(
self.provider_uuid, ocp_provider_uuid, start_date, end_date, matched_tag_strs
)
Loading

0 comments on commit 2160741

Please sign in to comment.