diff --git a/koku/api/provider/models.py b/koku/api/provider/models.py index 4670ef8342..43d80638ff 100644 --- a/koku/api/provider/models.py +++ b/koku/api/provider/models.py @@ -180,6 +180,10 @@ class Meta: PROVIDER_GCP, PROVIDER_GCP_LOCAL, ] + MANAGED_OPENSHIFT_ON_CLOUD_PROVIDER_LIST = [ + PROVIDER_AWS, + PROVIDER_AWS_LOCAL, + ] uuid = models.UUIDField(default=uuid4, primary_key=True) name = models.CharField(max_length=256, null=False) diff --git a/koku/masu/database/aws_report_db_accessor.py b/koku/masu/database/aws_report_db_accessor.py index e03bf08a3b..879d13252f 100644 --- a/koku/masu/database/aws_report_db_accessor.py +++ b/koku/masu/database/aws_report_db_accessor.py @@ -9,6 +9,7 @@ import uuid from dateutil.parser import parse +from django.conf import settings from django.db import connection from django.db.models import F from django.db.models import Q @@ -32,6 +33,7 @@ from reporting.provider.all.models import TagMapping from reporting.provider.aws.models import AWSCostEntryBill from reporting.provider.aws.models import AWSCostEntryLineItemDailySummary +from reporting.provider.aws.models import TRINO_MANAGED_OCP_AWS_DAILY_TABLE from reporting.provider.aws.models import UI_SUMMARY_TABLES from reporting.provider.aws.openshift.models import UI_SUMMARY_TABLES as OCPAWS_UI_SUMMARY_TABLES @@ -176,9 +178,10 @@ def populate_ocp_on_aws_ui_summary_tables_trino( } self._execute_trino_raw_sql_query(sql, sql_params=sql_params, log_ref=f"{table_name}.sql") - def delete_ocp_on_aws_hive_partition_by_day(self, days, aws_source, ocp_source, year, month): + def delete_ocp_on_aws_hive_partition_by_day( + self, days, aws_source, ocp_source, year, month, table="reporting_ocpawscostlineitem_project_daily_summary" + ): """Deletes partitions individually for each day in days list.""" - table = "reporting_ocpawscostlineitem_project_daily_summary" if self.schema_exists_trino() and self.table_exists_trino(table): LOG.info( log_json( @@ -193,16 +196,20 @@ def delete_ocp_on_aws_hive_partition_by_day(self, days, aws_source, ocp_source, ) ) for day in days: + if table == TRINO_MANAGED_OCP_AWS_DAILY_TABLE: + column_name = "source" + else: + column_name = "aws_source" sql = f""" DELETE FROM hive.{self.schema}.{table} - WHERE aws_source = '{aws_source}' + WHERE {column_name} = '{aws_source}' AND ocp_source = '{ocp_source}' AND year = '{year}' AND (month = replace(ltrim(replace('{month}', '0', ' ')),' ', '0') OR month = '{month}') AND day = '{day}'""" self._execute_trino_raw_sql_query( sql, - log_ref=f"delete_ocp_on_aws_hive_partition_by_day for {year}-{month}-{day}", + log_ref=f"delete_ocp_on_aws_hive_partition_by_day for {year}-{month}-{day} from {table}", ) def populate_ocp_on_aws_cost_daily_summary_trino( @@ -468,3 +475,45 @@ def populate_ec2_compute_summary_table_trino(self, source_uuid, start_date, bill } self._execute_trino_raw_sql_query(sql, sql_params=sql_params, log_ref=f"{table_name}.sql") + + def populate_ocp_on_cloud_daily_trino( + self, aws_provider_uuid, openshift_provider_uuid, start_date, end_date, matched_tags + ): + """Populate the aws_openshift_daily trino table for OCP on AWS. + Args: + aws_provider_uuid (UUID) GCP source UUID. + ocp_provider_uuid (UUID) OCP source UUID. + start_date (datetime.date) The date to start populating the table. + end_date (datetime.date) The date to end on. + matched_tag_strs (str) matching tags. + Returns + (None) + """ + if type(start_date) == str: + start_date = parse(start_date).astimezone(tz=settings.UTC) + if type(end_date) == str: + end_date = parse(end_date).astimezone(tz=settings.UTC) + year = start_date.strftime("%Y") + month = start_date.strftime("%m") + table = TRINO_MANAGED_OCP_AWS_DAILY_TABLE + days = self.date_helper.list_days(start_date, end_date) + days_tup = tuple(str(day.day) for day in days) + self.delete_ocp_on_aws_hive_partition_by_day( + days_tup, aws_provider_uuid, openshift_provider_uuid, year, month, table + ) + + summary_sql = pkgutil.get_data("masu.database", "trino_sql/aws/openshift/managed_aws_openshift_daily.sql") + summary_sql = summary_sql.decode("utf-8") + summary_sql_params = { + "schema": self.schema, + "start_date": start_date, + "year": year, + "month": month, + "days": days_tup, + "end_date": end_date, + "aws_source_uuid": aws_provider_uuid, + "ocp_source_uuid": openshift_provider_uuid, + "matched_tag_array": matched_tags, + } + LOG.info(log_json(msg="running managed OCP on AWS daily SQL", **summary_sql_params)) + self._execute_trino_multipart_sql_query(summary_sql, bind_params=summary_sql_params) diff --git a/koku/masu/database/trino_sql/aws/openshift/managed_aws_openshift_daily.sql b/koku/masu/database/trino_sql/aws/openshift/managed_aws_openshift_daily.sql new file mode 100644 index 0000000000..4a1eb3c8c4 --- /dev/null +++ b/koku/masu/database/trino_sql/aws/openshift/managed_aws_openshift_daily.sql @@ -0,0 +1,190 @@ +-- Now create our proper table if it does not exist +CREATE TABLE IF NOT EXISTS hive.{{schema | sqlsafe}}.managed_aws_openshift_daily +( + lineitem_resourceid varchar, + lineitem_usagestartdate timestamp(3), + bill_payeraccountid varchar, + lineitem_usageaccountid varchar, + lineitem_legalentity varchar, + lineitem_lineitemdescription varchar, + bill_billingentity varchar, + lineitem_productcode varchar, + lineitem_availabilityzone varchar, + lineitem_lineitemtype varchar, + lineitem_usagetype varchar, + lineitem_operation varchar, + product_productfamily varchar, + product_instancetype varchar, + product_region varchar, + pricing_unit varchar, + resourcetags varchar, + costcategory varchar, + lineitem_usageamount double, + lineitem_normalizationfactor double, + lineitem_normalizedusageamount double, + lineitem_currencycode varchar, + lineitem_unblendedrate double, + lineitem_unblendedcost double, + lineitem_blendedrate double, + lineitem_blendedcost double, + pricing_publicondemandcost double, + pricing_publicondemandrate double, + savingsplan_savingsplaneffectivecost double, + bill_invoiceid varchar, + product_productname varchar, + product_vcpu varchar, + product_memory varchar, + resource_id_matched boolean, + matched_tag varchar, + source varchar, + ocp_source varchar, + year varchar, + month varchar, + day varchar +) WITH(format = 'PARQUET', partitioned_by=ARRAY['source', 'ocp_source', 'year', 'month', 'day']) +; + +-- Direct resource matching +INSERT INTO hive.{{schema | sqlsafe}}.managed_aws_openshift_daily ( + lineitem_resourceid, + lineitem_usagestartdate, + bill_payeraccountid, + lineitem_usageaccountid, + lineitem_legalentity, + lineitem_lineitemdescription, + bill_billingentity, + lineitem_productcode, + lineitem_availabilityzone, + lineitem_lineitemtype, + product_productfamily, + product_instancetype, + product_region, + pricing_unit, + resourcetags, + costcategory, + lineitem_usageamount, + lineitem_normalizationfactor, + lineitem_normalizedusageamount, + lineitem_currencycode, + lineitem_unblendedrate, + lineitem_unblendedcost, + lineitem_blendedrate, + lineitem_blendedcost, + pricing_publicondemandcost, + pricing_publicondemandrate, + savingsplan_savingsplaneffectivecost, + product_productname, + bill_invoiceid, + resource_id_matched, + matched_tag, + source, + ocp_source, + year, + month, + day +) +WITH cte_aws_resource_names AS ( + SELECT DISTINCT lineitem_resourceid + FROM hive.{{schema | sqlsafe}}.aws_line_items_daily + WHERE source = {{aws_source_uuid}} + AND year = {{year}} + AND month = {{month}} + AND lineitem_usagestartdate >= {{start_date}} + AND lineitem_usagestartdate < date_add('day', 1, {{end_date}}) +), +cte_array_agg_nodes AS ( + SELECT DISTINCT resource_id + FROM hive.{{schema | sqlsafe}}.openshift_pod_usage_line_items_daily + WHERE source = {{ocp_source_uuid}} + AND year = {{year}} + AND month = {{month}} + AND interval_start >= {{start_date}} + AND interval_start < date_add('day', 1, {{end_date}}) +), +cte_array_agg_volumes AS ( + SELECT DISTINCT persistentvolume, csi_volume_handle + FROM hive.{{schema | sqlsafe}}.openshift_storage_usage_line_items_daily + WHERE source = {{ocp_source_uuid}} + AND year = {{year}} + AND month = {{month}} + AND interval_start >= {{start_date}} + AND interval_start < date_add('day', 1, {{end_date}}) +), +cte_matchable_resource_names AS ( + SELECT resource_names.lineitem_resourceid + FROM cte_aws_resource_names AS resource_names + JOIN cte_array_agg_nodes AS nodes + ON strpos(resource_names.lineitem_resourceid, nodes.resource_id) != 0 + + UNION + + SELECT resource_names.lineitem_resourceid + FROM cte_aws_resource_names AS resource_names + JOIN cte_array_agg_volumes AS volumes + ON ( + strpos(resource_names.lineitem_resourceid, volumes.persistentvolume) != 0 + OR strpos(resource_names.lineitem_resourceid, volumes.csi_volume_handle) != 0 + ) + +), +cte_tag_matches AS ( + SELECT * FROM unnest(ARRAY{{matched_tag_array | sqlsafe}}) as t(matched_tag) + + UNION + + SELECT * FROM unnest(ARRAY['openshift_cluster', 'openshift_node', 'openshift_project']) as t(matched_tag) +), +cte_agg_tags AS ( + SELECT array_agg(matched_tag) as matched_tags from cte_tag_matches +) +SELECT aws.lineitem_resourceid, + aws.lineitem_usagestartdate, + aws.bill_payeraccountid, + aws.lineitem_usageaccountid, + aws.lineitem_legalentity, + aws.lineitem_lineitemdescription, + aws.bill_billingentity, + aws.lineitem_productcode, + aws.lineitem_availabilityzone, + aws.lineitem_lineitemtype, + aws.product_productfamily, + aws.product_instancetype, + aws.product_region, + aws.pricing_unit, + aws.resourcetags, + aws.costcategory, + aws.lineitem_usageamount, + aws.lineitem_normalizationfactor, + aws.lineitem_normalizedusageamount, + aws.lineitem_currencycode, + aws.lineitem_unblendedrate, + aws.lineitem_unblendedcost, + aws.lineitem_blendedrate, + aws.lineitem_blendedcost, + aws.pricing_publicondemandcost, + aws.pricing_publicondemandrate, + aws.savingsplan_savingsplaneffectivecost, + aws.product_productname, + aws.bill_invoiceid, + CASE WHEN resource_names.lineitem_resourceid IS NOT NULL + THEN TRUE + ELSE FALSE + END as resource_id_matched, + array_join(filter(tag_matches.matched_tags, x -> STRPOS(resourcetags, x ) != 0), ',') as matched_tag, + aws.source as source, + {{ocp_source_uuid}} as ocp_source, + aws.year, + aws.month, + cast(day(aws.lineitem_usagestartdate) as varchar) as day +FROM hive.{{schema | sqlsafe}}.aws_line_items_daily AS aws +LEFT JOIN cte_matchable_resource_names AS resource_names + -- this matches the endswith method this matching was done with in python + ON substr(aws.lineitem_resourceid, -length(resource_names.lineitem_resourceid)) = resource_names.lineitem_resourceid +LEFT JOIN cte_agg_tags AS tag_matches + ON any_match(tag_matches.matched_tags, x->strpos(resourcetags, x) != 0) +WHERE aws.source = {{aws_source_uuid}} + AND aws.year = {{year}} + AND aws.month= {{month}} + AND aws.lineitem_usagestartdate >= {{start_date}} + AND aws.lineitem_usagestartdate < date_add('day', 1, {{end_date}}) + AND (resource_names.lineitem_resourceid IS NOT NULL OR tag_matches.matched_tags IS NOT NULL) diff --git a/koku/masu/management/commands/migrate_trino_tables.py b/koku/masu/management/commands/migrate_trino_tables.py index c4cb86d8a1..9a04d868b8 100644 --- a/koku/masu/management/commands/migrate_trino_tables.py +++ b/koku/masu/management/commands/migrate_trino_tables.py @@ -68,6 +68,7 @@ "reporting_ocpusagelineitem_daily_summary", "azure_openshift_disk_capacities_temp", "aws_openshift_disk_capacities_temp", + "managed_aws_openshift_daily", } manage_table_mapping = { @@ -86,6 +87,7 @@ "reporting_ocpusagelineitem_daily_summary": "source", "azure_openshift_disk_capacities_temp": "ocp_source", "aws_openshift_disk_capacities_temp": "ocp_source", + "managed_aws_openshift_daily": "ocp_source", } VALID_CHARACTERS = re.compile(r"^[\w.-]+$") diff --git a/koku/masu/processor/__init__.py b/koku/masu/processor/__init__.py index 12ab277925..aec7d275ed 100644 --- a/koku/masu/processor/__init__.py +++ b/koku/masu/processor/__init__.py @@ -108,6 +108,18 @@ def is_validation_enabled(account): # pragma: no cover return UNLEASH_CLIENT.is_enabled("cost-management.backend.enable_data_validation", context) +def is_managed_ocp_cloud_processing_enabled(account): + account = convert_account(account) + context = {"schema": account} + return UNLEASH_CLIENT.is_enabled("cost-management.backend.feature-cost-5129-ocp-cloud-processing", context) + + +def is_managed_ocp_cloud_summary_enabled(account): + account = convert_account(account) + context = {"schema": account} + return UNLEASH_CLIENT.is_enabled("cost-management.backend.feature-cost-5129-ocp-cloud-summary", context) + + def is_ocp_amortized_monthly_cost_enabled(account): # pragma: no cover """Enable the use of savings plan cost for OCP on AWS -> OCP.""" account = convert_account(account) diff --git a/koku/masu/processor/orchestrator.py b/koku/masu/processor/orchestrator.py index b5fc9ae717..de8d2f58f3 100644 --- a/koku/masu/processor/orchestrator.py +++ b/koku/masu/processor/orchestrator.py @@ -30,6 +30,7 @@ from masu.processor import is_customer_large from masu.processor import is_source_disabled from masu.processor.tasks import get_report_files +from masu.processor.tasks import process_openshift_on_cloud_trino from masu.processor.tasks import record_all_manifest_files from masu.processor.tasks import record_report_status from masu.processor.tasks import remove_expired_data @@ -373,7 +374,20 @@ def start_manifest_processing( # noqa: C901 queue=SUBS_EXTRACTION_QUEUE ) LOG.info(log_json("start_manifest_processing", msg="created subs_task signature", schema=schema_name)) - async_id = chord(report_tasks, group(summary_task, hcs_task, subs_task))() + ocp_on_cloud_trino_task = process_openshift_on_cloud_trino.s( + provider_type=provider_type, + schema_name=schema_name, + provider_uuid=provider_uuid, + tracing_id=tracing_id, + ).set(queue=SUMMARY_QUEUE) + LOG.info( + log_json( + "start_manifest_processing", + msg="created ocp_on_cloud_trino_task signature", + schema=schema_name, + ) + ) + async_id = chord(report_tasks, group(summary_task, hcs_task, subs_task, ocp_on_cloud_trino_task))() LOG.info( log_json( "start_manifest_processing", diff --git a/koku/masu/processor/parquet/ocp_cloud_parquet_report_processor.py b/koku/masu/processor/parquet/ocp_cloud_parquet_report_processor.py index 1fc4eba92e..e2b33c3f44 100644 --- a/koku/masu/processor/parquet/ocp_cloud_parquet_report_processor.py +++ b/koku/masu/processor/parquet/ocp_cloud_parquet_report_processor.py @@ -4,6 +4,7 @@ # # """Processor to filter cost data for OpenShift and store as parquet.""" +import json import logging from functools import cached_property @@ -276,3 +277,17 @@ def process(self, parquet_base_filename, daily_data_frames): self.create_partitioned_ocp_on_cloud_parquet(openshift_filtered_data_frame, parquet_base_filename) else: self.create_ocp_on_cloud_parquet(openshift_filtered_data_frame, parquet_base_filename) + + def process_ocp_cloud_trino(self, start_date, end_date): + """Populate cloud_openshift_daily trino table via SQL.""" + if not (ocp_provider_uuids := self.get_ocp_provider_uuids_tuple()): + return + matched_tags = self.get_matched_tags(ocp_provider_uuids) + matched_tag_strs = [] + if matched_tags: + matched_tag_strs = [json.dumps(match).replace("{", "").replace("}", "") for match in matched_tags] + + for ocp_provider_uuid in ocp_provider_uuids: + self.db_accessor.populate_ocp_on_cloud_daily_trino( + self.provider_uuid, ocp_provider_uuid, start_date, end_date, matched_tag_strs + ) diff --git a/koku/masu/processor/tasks.py b/koku/masu/processor/tasks.py index 599e29e657..f29f1d2523 100644 --- a/koku/masu/processor/tasks.py +++ b/koku/masu/processor/tasks.py @@ -43,6 +43,7 @@ from masu.exceptions import MasuProviderError from masu.external.downloader.report_downloader_base import ReportDownloaderWarning from masu.external.report_downloader import ReportDownloaderError +from masu.processor import is_managed_ocp_cloud_processing_enabled from masu.processor import is_ocp_on_cloud_summary_disabled from masu.processor import is_rate_limit_customer_large from masu.processor import is_source_disabled @@ -82,6 +83,68 @@ UPDATE_SUMMARY_TABLES_TASK = "masu.processor.tasks.update_summary_tables" +def deduplicate_summary_reports(reports_to_summarize, manifest_list): + """Take a list of reports to summarize and deduplicate them.""" + reports_by_source = defaultdict(list) + schema_name = None + for report in reports_to_summarize: + if report: + reports_by_source[report.get("provider_uuid")].append(report) + + if schema_name is None: + # Only set the schema name once + schema_name = report.get("schema_name") + + reports_deduplicated = [] + dedup_func_map = { + Provider.PROVIDER_GCP: deduplicate_reports_for_gcp, + Provider.PROVIDER_GCP_LOCAL: deduplicate_reports_for_gcp, + Provider.PROVIDER_OCI: deduplicate_reports_for_oci, + Provider.PROVIDER_OCI_LOCAL: deduplicate_reports_for_oci, + } + + kwargs = {} + if schema_name: + kwargs["schema_name"] = schema_name + + LOG.info(log_json("summarize_reports", msg="deduplicating reports", **kwargs)) + for report_list in reports_by_source.values(): + if report and report.get("provider_type") in dedup_func_map: + provider_type = report.get("provider_type") + manifest_list = [] if "oci" in provider_type.lower() else manifest_list + dedup_func = dedup_func_map.get(provider_type) + reports_deduplicated.extend(dedup_func(report_list)) + else: + starts = [] + ends = [] + for report in report_list: + if report.get("start") and report.get("end"): + starts.append(report.get("start")) + ends.append(report.get("end")) + start = min(starts) if starts != [] else None + end = max(ends) if ends != [] else None + reports_deduplicated.append( + { + "manifest_id": report.get("manifest_id"), + "tracing_id": report.get("tracing_id"), + "schema_name": report.get("schema_name"), + "provider_type": report.get("provider_type"), + "provider_uuid": report.get("provider_uuid"), + "start": start, + "end": end, + } + ) + + LOG.info( + log_json( + "summarize_reports", + msg=f"deduplicated reports, num report: {len(reports_deduplicated)}", + **kwargs, + ) + ) + return reports_deduplicated + + def delayed_summarize_current_month(schema_name: str, provider_uuids: list, provider_type: str): """Delay Resummarize provider data for the current month.""" queue = get_customer_queue(schema_name, SummaryQueue) @@ -352,64 +415,9 @@ def summarize_reports( # noqa: C901 None """ - reports_by_source = defaultdict(list) - schema_name = None - for report in reports_to_summarize: - if report: - reports_by_source[report.get("provider_uuid")].append(report) - - if schema_name is None: - # Only set the schema name once - schema_name = report.get("schema_name") - - reports_deduplicated = [] - dedup_func_map = { - Provider.PROVIDER_GCP: deduplicate_reports_for_gcp, - Provider.PROVIDER_GCP_LOCAL: deduplicate_reports_for_gcp, - Provider.PROVIDER_OCI: deduplicate_reports_for_oci, - Provider.PROVIDER_OCI_LOCAL: deduplicate_reports_for_oci, - } - - kwargs = {} - if schema_name: - kwargs["schema_name"] = schema_name - - LOG.info(log_json("summarize_reports", msg="deduplicating reports", **kwargs)) - for report_list in reports_by_source.values(): - if report and report.get("provider_type") in dedup_func_map: - provider_type = report.get("provider_type") - manifest_list = [] if "oci" in provider_type.lower() else manifest_list - dedup_func = dedup_func_map.get(provider_type) - reports_deduplicated.extend(dedup_func(report_list)) - else: - starts = [] - ends = [] - for report in report_list: - if report.get("start") and report.get("end"): - starts.append(report.get("start")) - ends.append(report.get("end")) - start = min(starts) if starts != [] else None - end = max(ends) if ends != [] else None - reports_deduplicated.append( - { - "manifest_id": report.get("manifest_id"), - "tracing_id": report.get("tracing_id"), - "schema_name": report.get("schema_name"), - "provider_type": report.get("provider_type"), - "provider_uuid": report.get("provider_uuid"), - "start": start, - "end": end, - } - ) - - LOG.info( - log_json( - "summarize_reports", - msg=f"deduplicated reports, num report: {len(reports_deduplicated)}", - **kwargs, - ) - ) + reports_deduplicated = deduplicate_summary_reports(reports_to_summarize, manifest_list) for report in reports_deduplicated: + schema_name = report.get("schema_name") # For day-to-day summarization we choose a small window to # cover new data from a window of days. # This saves us from re-summarizing unchanged data and cuts down @@ -429,7 +437,7 @@ def summarize_reports( # noqa: C901 months = get_months_in_date_range(report) for month in months: update_summary_tables.s( - report.get("schema_name"), + schema_name, report.get("provider_type"), report.get("provider_uuid"), start_date=month[0], @@ -1266,3 +1274,43 @@ def validate_daily_data(schema, start_date, end_date, provider_uuid, ocp_on_clou data_validator.check_data_integrity() else: LOG.info(log_json(msg="skipping validation, disabled for schema", context=context)) + + +@celery_app.task(name="masu.processor.tasks.process_openshift_on_cloud_trino", queue=SummaryQueue.DEFAULT, bind=True) +def process_openshift_on_cloud_trino( + self, reports_to_summarize, provider_type, schema_name, provider_uuid, tracing_id +): + """Process OCP on Cloud data into managed tables for summary""" + reports_deduplicated = deduplicate_summary_reports(reports_to_summarize, manifest_list=[]) + for report in reports_deduplicated: + schema_name = report.get("schema_name") + ctx = {"provider_type": provider_type, "schema_name": schema_name, "provider_uuid": provider_uuid} + if provider_type not in Provider.MANAGED_OPENSHIFT_ON_CLOUD_PROVIDER_LIST: + LOG.info( + log_json(tracing_id, msg="provider type not valid for COST-5129 OCP on cloud processing", context=ctx) + ) + continue + if not is_managed_ocp_cloud_processing_enabled(schema_name): + LOG.info( + log_json(tracing_id, msg="provider not enabled for COST-5129 OCP on cloud processing", context=ctx) + ) + continue + with ReportManifestDBAccessor() as manifest_accesor: + tracing_id = report.get("tracing_id", report.get("manifest_uuid", "no-tracing-id")) + + if not manifest_accesor.manifest_ready_for_summary(report.get("manifest_id")): + LOG.info(log_json(tracing_id, msg="manifest not ready for summary", context=report)) + continue + + LOG.info(log_json(tracing_id, msg="report to summarize", context=report)) + + months = get_months_in_date_range(report) + for month in months: + start_date = month[0] + end_date = month[1] + # invoice_month = month[2] + ctx["start_date"] = start_date + ctx["end_date"] = end_date + manifest_id = report.get("manifest_id") + processor = OCPCloudParquetReportProcessor(schema_name, "", provider_uuid, provider_type, manifest_id, ctx) + processor.process_ocp_cloud_trino(start_date, end_date) diff --git a/koku/masu/test/database/test_aws_report_db_accessor.py b/koku/masu/test/database/test_aws_report_db_accessor.py index 2ce03b09fb..df3dda1ad2 100644 --- a/koku/masu/test/database/test_aws_report_db_accessor.py +++ b/koku/masu/test/database/test_aws_report_db_accessor.py @@ -36,6 +36,7 @@ from reporting.provider.aws.models import AWSCostEntryBill from reporting.provider.aws.models import AWSCostEntryLineItemDailySummary from reporting.provider.aws.models import AWSCostEntryLineItemSummaryByEC2Compute +from reporting.provider.aws.models import TRINO_MANAGED_OCP_AWS_DAILY_TABLE class AWSReportDBAccessorTest(MasuTestCase): @@ -288,6 +289,44 @@ def test_delete_ocp_on_aws_hive_partition_by_day( mock_connect.assert_called() self.assertEqual(mock_connect.call_count, settings.HIVE_PARTITION_DELETE_RETRIES) + @patch("masu.database.aws_report_db_accessor.AWSReportDBAccessor.schema_exists_trino") + @patch("masu.database.aws_report_db_accessor.AWSReportDBAccessor.table_exists_trino") + @patch("masu.database.report_db_accessor_base.trino_db.connect") + @patch("time.sleep", return_value=None) + def test_delete_ocp_on_aws_hive_partition_by_day_managed_table( + self, mock_sleep, mock_connect, mock_table_exists, mock_schema_exists + ): + """Test that deletions work with retries.""" + mock_schema_exists.return_value = False + self.accessor.delete_ocp_on_aws_hive_partition_by_day( + [1], + self.aws_provider_uuid, + self.ocp_provider_uuid, + "2022", + "01", + TRINO_MANAGED_OCP_AWS_DAILY_TABLE, + ) + mock_connect.assert_not_called() + + mock_connect.reset_mock() + + mock_schema_exists.return_value = True + attrs = {"cursor.side_effect": TrinoExternalError({"errorName": "HIVE_METASTORE_ERROR"})} + mock_connect.return_value = Mock(**attrs) + + with self.assertRaises(TrinoHiveMetastoreError): + self.accessor.delete_ocp_on_aws_hive_partition_by_day( + [1], + self.aws_provider_uuid, + self.ocp_provider_uuid, + "2022", + "01", + TRINO_MANAGED_OCP_AWS_DAILY_TABLE, + ) + + mock_connect.assert_called() + self.assertEqual(mock_connect.call_count, settings.HIVE_PARTITION_DELETE_RETRIES) + @patch("masu.database.aws_report_db_accessor.AWSReportDBAccessor._execute_trino_raw_sql_query") def test_check_for_matching_enabled_keys_no_matches(self, mock_trino): """Test that Trino is used to find matched tags.""" @@ -479,3 +518,29 @@ def test_populate_ec2_compute_summary_table_trino(self, mock_trino): self.aws_provider_uuid, start_date, current_bill_id, markup_value ) mock_trino.assert_called() + + @patch("masu.database.aws_report_db_accessor.AWSReportDBAccessor.delete_ocp_on_aws_hive_partition_by_day") + @patch("masu.database.aws_report_db_accessor.AWSReportDBAccessor._execute_trino_multipart_sql_query") + def test_populate_ocp_on_cloud_daily_trino(self, mock_trino, mock_partition_delete): + """ + Test that calling ocp on cloud populate triggers the deletes and summary sql. + """ + start_date = "2024-08-01" + end_date = "2024-08-05" + year = "2024" + month = "08" + matched_tags = "fake-tags" + expected_days = ("1", "2", "3", "4", "5") + + self.accessor.populate_ocp_on_cloud_daily_trino( + self.aws_provider_uuid, self.ocp_provider_uuid, start_date, end_date, matched_tags + ) + mock_partition_delete.assert_called_with( + expected_days, + self.aws_provider_uuid, + self.ocp_provider_uuid, + year, + month, + TRINO_MANAGED_OCP_AWS_DAILY_TABLE, + ) + mock_trino.assert_called() diff --git a/koku/masu/test/processor/parquet/test_ocp_cloud_parquet_report_processor.py b/koku/masu/test/processor/parquet/test_ocp_cloud_parquet_report_processor.py index b62d5061bb..1a3599a41b 100644 --- a/koku/masu/test/processor/parquet/test_ocp_cloud_parquet_report_processor.py +++ b/koku/masu/test/processor/parquet/test_ocp_cloud_parquet_report_processor.py @@ -461,3 +461,34 @@ def test_instantiating_processor_with_manifest_id(self): """Assert that report_status exists and is not None.""" self.assertIsNotNone(self.report_processor.report_status) self.assertIsInstance(self.report_processor.report_status, CostUsageReportStatus) + + def test_process_ocp_cloud_trino(self): + """Test that processing ocp on cloud via trino calls the expected functions.""" + start_date = "2024-08-01" + end_date = "2024-08-05" + ocp_uuids = (self.ocp_provider_uuid,) + matched_tags = [] + with patch( + ( + "masu.processor.parquet.ocp_cloud_parquet_report_processor" + ".OCPCloudParquetReportProcessor.get_ocp_provider_uuids_tuple" + ), + return_value=ocp_uuids, + ), patch( + "masu.processor.parquet.ocp_cloud_parquet_report_processor.OCPCloudParquetReportProcessor.get_matched_tags", + return_value=matched_tags, + ), patch( + "masu.processor.parquet.ocp_cloud_parquet_report_processor.OCPCloudParquetReportProcessor.db_accessor" + ) as accessor: + rp = OCPCloudParquetReportProcessor( + schema_name=self.schema, + report_path=self.report_path, + provider_uuid=self.aws_provider_uuid, + provider_type=Provider.PROVIDER_AWS_LOCAL, + manifest_id=self.manifest_id, + context={"request_id": self.request_id, "start_date": self.start_date, "create_table": True}, + ) + rp.process_ocp_cloud_trino(start_date, end_date) + accessor.populate_ocp_on_cloud_daily_trino.assert_called_with( + self.aws_provider_uuid, self.ocp_provider_uuid, start_date, end_date, matched_tags + ) diff --git a/koku/masu/test/processor/test_tasks.py b/koku/masu/test/processor/test_tasks.py index 18f651618d..3b0a26a307 100644 --- a/koku/masu/test/processor/test_tasks.py +++ b/koku/masu/test/processor/test_tasks.py @@ -55,6 +55,7 @@ from masu.processor.tasks import normalize_table_options from masu.processor.tasks import process_daily_openshift_on_cloud from masu.processor.tasks import process_openshift_on_cloud +from masu.processor.tasks import process_openshift_on_cloud_trino from masu.processor.tasks import record_all_manifest_files from masu.processor.tasks import record_report_status from masu.processor.tasks import remove_expired_data @@ -1816,3 +1817,93 @@ def test_remove_stale_tenant(self): after_len = Tenant.objects.count() self.assertGreater(before_len, after_len) self.assertEqual(KokuTenantMiddleware.tenant_cache.currsize, 0) + + +class TestProcessOpenshiftOnCloudTrino(MasuTestCase): + @patch( + "masu.processor.tasks.is_managed_ocp_cloud_processing_enabled", + return_value=True, + ) + @patch("masu.processor.tasks.OCPCloudParquetReportProcessor.process_ocp_cloud_trino") + def test_process_openshift_on_cloud_trino(self, mock_process, mock_unleash): + """Test that the process_openshift_on_cloud_trino task performs expected functions""" + start = "2024-08-01" + end = "2024-08-05" + reports = [ + { + "schema_name": self.schema, + "provider_type": self.aws_provider.type, + "provider_uuid": str(self.aws_provider.uuid), + "tracing_id": "", + "start": start, + "end": end, + "manifest_id": 1, + } + ] + process_openshift_on_cloud_trino(reports, self.aws_provider.type, self.schema, self.provider_uuid, "") + mock_process.assert_called_with(start, end) + + @patch("masu.processor.tasks.OCPCloudParquetReportProcessor.process_ocp_cloud_trino") + def test_process_openshift_on_cloud_trino_unleash_false(self, mock_process): + """Test that the process_openshift_on_cloud_trino task performs expected functions""" + start = "2024-08-01" + end = "2024-08-05" + reports = [ + { + "schema_name": self.schema, + "provider_type": self.aws_provider.type, + "provider_uuid": str(self.aws_provider.uuid), + "tracing_id": "", + "start": start, + "end": end, + "manifest_id": 1, + } + ] + process_openshift_on_cloud_trino(reports, self.aws_provider.type, self.schema, self.provider_uuid, "") + mock_process.assert_not_called() + + @patch( + "masu.processor.tasks.is_managed_ocp_cloud_processing_enabled", + return_value=True, + ) + @patch("masu.processor.tasks.OCPCloudParquetReportProcessor.process_ocp_cloud_trino") + def test_process_openshift_on_cloud_trino_bad_provider_type(self, mock_process, mock_unleash): + """Test that the process_openshift_on_cloud_trino task performs expected functions""" + start = "2024-08-01" + end = "2024-08-05" + reports = [ + { + "schema_name": self.schema, + "provider_type": self.oci_provider.type, + "provider_uuid": str(self.oci_provider.uuid), + "tracing_id": "", + "start": start, + "end": end, + "manifest_id": 1, + } + ] + process_openshift_on_cloud_trino(reports, self.oci_provider.type, self.schema, self.provider_uuid, "") + mock_process.assert_not_called() + + @patch( + "masu.processor.tasks.is_managed_ocp_cloud_processing_enabled", + return_value=True, + ) + @patch("masu.processor.tasks.OCPCloudParquetReportProcessor.process_ocp_cloud_trino") + def test_process_openshift_on_cloud_trino_manifest_not_ready(self, mock_process, mock_unleash): + """Test that the process_openshift_on_cloud_trino task performs expected functions""" + start = "2024-08-01" + end = "2024-08-05" + reports = [ + { + "schema_name": self.schema, + "provider_type": self.oci_provider.type, + "provider_uuid": str(self.aws_provider.uuid), + "tracing_id": "", + "start": start, + "end": end, + "manifest_id": 1000, + } + ] + process_openshift_on_cloud_trino(reports, self.aws_provider.type, self.schema, self.provider_uuid, "") + mock_process.assert_not_called() diff --git a/koku/reporting/models.py b/koku/reporting/models.py index 75568d011c..21f18e8fca 100644 --- a/koku/reporting/models.py +++ b/koku/reporting/models.py @@ -176,6 +176,7 @@ "reporting_ocpgcpcostlineitem_project_daily_summary_temp": "ocp_source", "gcp_openshift_daily_resource_matched_temp": "ocp_source", "gcp_openshift_daily_tag_matched_temp": "ocp_source", + "managed_aws_openshift_daily": "ocp_source", } # These are cleaned during expired_data flow diff --git a/koku/reporting/provider/aws/models.py b/koku/reporting/provider/aws/models.py index 6859314b61..a9e50b768d 100644 --- a/koku/reporting/provider/aws/models.py +++ b/koku/reporting/provider/aws/models.py @@ -32,6 +32,7 @@ TRINO_LINE_ITEM_TABLE = "aws_line_items" TRINO_LINE_ITEM_DAILY_TABLE = "aws_line_items_daily" TRINO_OCP_ON_AWS_DAILY_TABLE = "aws_openshift_daily" +TRINO_MANAGED_OCP_AWS_DAILY_TABLE = "managed_aws_openshift_daily" TRINO_REQUIRED_COLUMNS = { "bill/BillingEntity": "",